1840 lines
59 KiB
Go
1840 lines
59 KiB
Go
// Copyright 2023 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package importer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"net/url"
|
|
"os"
|
|
"path/filepath"
|
|
"runtime"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"unicode/utf8"
|
|
|
|
"github.com/docker/go-units"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/log"
|
|
tidb "github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
|
"github.com/pingcap/tidb/pkg/ddl/util"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/handle"
|
|
"github.com/pingcap/tidb/pkg/dxf/framework/scheduler"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
tidbkv "github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lightning/backend/local"
|
|
"github.com/pingcap/tidb/pkg/lightning/common"
|
|
"github.com/pingcap/tidb/pkg/lightning/config"
|
|
litlog "github.com/pingcap/tidb/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/pkg/lightning/mydump"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/objstore"
|
|
"github.com/pingcap/tidb/pkg/objstore/compressedio"
|
|
"github.com/pingcap/tidb/pkg/objstore/storeapi"
|
|
"github.com/pingcap/tidb/pkg/parser"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
pformat "github.com/pingcap/tidb/pkg/parser/format"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
plannercore "github.com/pingcap/tidb/pkg/planner/core"
|
|
"github.com/pingcap/tidb/pkg/planner/planctx"
|
|
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/table"
|
|
tidbutil "github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
contextutil "github.com/pingcap/tidb/pkg/util/context"
|
|
"github.com/pingcap/tidb/pkg/util/cpu"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
|
|
"github.com/pingcap/tidb/pkg/util/filter"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/naming"
|
|
sem "github.com/pingcap/tidb/pkg/util/sem/compat"
|
|
"github.com/pingcap/tidb/pkg/util/stringutil"
|
|
pd "github.com/tikv/pd/client"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
const (
|
|
// DataFormatCSV represents the data source file of IMPORT INTO is csv.
|
|
DataFormatCSV = "csv"
|
|
// DataFormatDelimitedData delimited data.
|
|
DataFormatDelimitedData = "delimited data"
|
|
// DataFormatSQL represents the data source file of IMPORT INTO is mydumper-format DML file.
|
|
DataFormatSQL = "sql"
|
|
// DataFormatParquet represents the data source file of IMPORT INTO is parquet.
|
|
DataFormatParquet = "parquet"
|
|
// DataFormatAuto represents format is not set in IMPORT INTO, we will determine format automatically.
|
|
DataFormatAuto = "auto"
|
|
|
|
// DefaultDiskQuota is the default disk quota for IMPORT INTO
|
|
DefaultDiskQuota = config.ByteSize(50 << 30) // 50GiB
|
|
|
|
// 0 means no limit
|
|
unlimitedWriteSpeed = config.ByteSize(0)
|
|
|
|
characterSetOption = "character_set"
|
|
fieldsTerminatedByOption = "fields_terminated_by"
|
|
fieldsEnclosedByOption = "fields_enclosed_by"
|
|
fieldsEscapedByOption = "fields_escaped_by"
|
|
fieldsDefinedNullByOption = "fields_defined_null_by"
|
|
linesTerminatedByOption = "lines_terminated_by"
|
|
skipRowsOption = "skip_rows"
|
|
groupKeyOption = "group_key"
|
|
splitFileOption = "split_file"
|
|
diskQuotaOption = "disk_quota"
|
|
threadOption = "thread"
|
|
maxWriteSpeedOption = "max_write_speed"
|
|
checksumTableOption = "checksum_table"
|
|
recordErrorsOption = "record_errors"
|
|
detachedOption = "detached"
|
|
// if 'import mode' enabled, TiKV will:
|
|
// - set level0_stop_writes_trigger = max(old, 1 << 30)
|
|
// - set level0_slowdown_writes_trigger = max(old, 1 << 30)
|
|
// - set soft_pending_compaction_bytes_limit = 0,
|
|
// - set hard_pending_compaction_bytes_limit = 0,
|
|
// - will not trigger flow control when SST count in L0 is large
|
|
// - will not trigger region split, it might cause some region became
|
|
// very large and be a hotspot, might cause latency spike.
|
|
//
|
|
// default false for local sort, true for global sort.
|
|
disableTiKVImportModeOption = "disable_tikv_import_mode"
|
|
cloudStorageURIOption = ast.CloudStorageURI
|
|
disablePrecheckOption = "disable_precheck"
|
|
// used for test
|
|
maxEngineSizeOption = "__max_engine_size"
|
|
forceMergeStep = "__force_merge_step"
|
|
manualRecoveryOption = "__manual_recovery"
|
|
)
|
|
|
|
var (
|
|
// all supported options.
|
|
// name -> whether the option has value
|
|
supportedOptions = map[string]bool{
|
|
characterSetOption: true,
|
|
fieldsTerminatedByOption: true,
|
|
fieldsEnclosedByOption: true,
|
|
fieldsEscapedByOption: true,
|
|
fieldsDefinedNullByOption: true,
|
|
linesTerminatedByOption: true,
|
|
skipRowsOption: true,
|
|
groupKeyOption: true,
|
|
splitFileOption: false,
|
|
diskQuotaOption: true,
|
|
threadOption: true,
|
|
maxWriteSpeedOption: true,
|
|
checksumTableOption: true,
|
|
recordErrorsOption: true,
|
|
detachedOption: false,
|
|
disableTiKVImportModeOption: false,
|
|
maxEngineSizeOption: true,
|
|
forceMergeStep: false,
|
|
manualRecoveryOption: false,
|
|
cloudStorageURIOption: true,
|
|
disablePrecheckOption: false,
|
|
}
|
|
|
|
csvOnlyOptions = map[string]struct{}{
|
|
characterSetOption: {},
|
|
fieldsTerminatedByOption: {},
|
|
fieldsEnclosedByOption: {},
|
|
fieldsEscapedByOption: {},
|
|
fieldsDefinedNullByOption: {},
|
|
linesTerminatedByOption: {},
|
|
skipRowsOption: {},
|
|
splitFileOption: {},
|
|
}
|
|
|
|
// we only support global sort on nextgen cluster when SEM enabled, and doesn't
|
|
// allow set separate cloud storage URI.
|
|
disallowedOptionsOfNextGen = map[string]struct{}{
|
|
diskQuotaOption: {},
|
|
maxWriteSpeedOption: {},
|
|
cloudStorageURIOption: {},
|
|
threadOption: {},
|
|
checksumTableOption: {},
|
|
recordErrorsOption: {},
|
|
}
|
|
|
|
disallowedOptionsForSEM = map[string]struct{}{
|
|
maxEngineSizeOption: {},
|
|
forceMergeStep: {},
|
|
manualRecoveryOption: {},
|
|
}
|
|
|
|
allowedOptionsOfImportFromQuery = map[string]struct{}{
|
|
threadOption: {},
|
|
disablePrecheckOption: {},
|
|
}
|
|
|
|
// LoadDataReadBlockSize is exposed for test.
|
|
LoadDataReadBlockSize = int64(config.ReadBlockSize)
|
|
|
|
supportedSuffixForServerDisk = []string{
|
|
".csv", ".sql", ".parquet",
|
|
".gz", ".gzip",
|
|
".zstd", ".zst",
|
|
".snappy",
|
|
}
|
|
|
|
// default character set
|
|
defaultCharacterSet = "utf8mb4"
|
|
// default field null def
|
|
defaultFieldNullDef = []string{`\N`}
|
|
)
|
|
|
|
// DataSourceType indicates the data source type of IMPORT INTO.
|
|
type DataSourceType string
|
|
|
|
const (
|
|
// DataSourceTypeFile represents the data source of IMPORT INTO is file.
|
|
// exported for test.
|
|
DataSourceTypeFile DataSourceType = "file"
|
|
// DataSourceTypeQuery represents the data source of IMPORT INTO is query.
|
|
DataSourceTypeQuery DataSourceType = "query"
|
|
)
|
|
|
|
func (t DataSourceType) String() string {
|
|
return string(t)
|
|
}
|
|
|
|
var (
|
|
// NewClientWithContext returns a kv.Client.
|
|
NewClientWithContext = pd.NewClientWithContext
|
|
)
|
|
|
|
// FieldMapping indicates the relationship between input field and table column or user variable
|
|
type FieldMapping struct {
|
|
Column *table.Column
|
|
UserVar *ast.VariableExpr
|
|
}
|
|
|
|
// LoadDataReaderInfo provides information for a data reader of LOAD DATA.
|
|
type LoadDataReaderInfo struct {
|
|
// Opener can be called at needed to get a io.ReadSeekCloser. It will only
|
|
// be called once.
|
|
Opener func(ctx context.Context) (io.ReadSeekCloser, error)
|
|
// Remote is not nil only if load from cloud storage.
|
|
Remote *mydump.SourceFileMeta
|
|
}
|
|
|
|
// Plan describes the plan of LOAD DATA and IMPORT INTO.
|
|
type Plan struct {
|
|
DBName string
|
|
DBID int64
|
|
// TableInfo is the table info we used during import, we might change it
|
|
// if add index by SQL is enabled(it's disabled now).
|
|
TableInfo *model.TableInfo
|
|
// DesiredTableInfo is the table info before import, and the desired table info
|
|
// after import.
|
|
DesiredTableInfo *model.TableInfo
|
|
|
|
Path string
|
|
// only effective when data source is file.
|
|
Format string
|
|
// Data interpretation is restrictive if the SQL mode is restrictive and neither
|
|
// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
|
|
// operation.
|
|
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
|
|
Restrictive bool
|
|
|
|
// Location is used to convert time type for parquet, see
|
|
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp
|
|
Location *time.Location
|
|
|
|
SQLMode mysql.SQLMode
|
|
// Charset is the charset of the data file when file is CSV or TSV.
|
|
// it might be nil when using LOAD DATA and no charset is specified.
|
|
// for IMPORT INTO, it is always non-nil and default to be defaultCharacterSet.
|
|
Charset *string
|
|
ImportantSysVars map[string]string
|
|
|
|
// used for LOAD DATA and CSV format of IMPORT INTO
|
|
FieldNullDef []string
|
|
// this is not used in IMPORT INTO
|
|
NullValueOptEnclosed bool
|
|
// LinesStartingBy is not used in IMPORT INTO
|
|
// FieldsOptEnclosed is not used in either IMPORT INTO or LOAD DATA
|
|
plannercore.LineFieldsInfo
|
|
IgnoreLines uint64
|
|
|
|
DiskQuota config.ByteSize
|
|
Checksum config.PostOpLevel
|
|
ThreadCnt int
|
|
MaxNodeCnt int
|
|
MaxWriteSpeed config.ByteSize
|
|
SplitFile bool
|
|
MaxRecordedErrors int64
|
|
Detached bool
|
|
DisableTiKVImportMode bool
|
|
MaxEngineSize config.ByteSize
|
|
CloudStorageURI string
|
|
DisablePrecheck bool
|
|
GroupKey string
|
|
|
|
// used for checksum in physical mode
|
|
DistSQLScanConcurrency int
|
|
|
|
// todo: remove it when load data code is reverted.
|
|
InImportInto bool
|
|
DataSourceType DataSourceType
|
|
// only initialized for IMPORT INTO, used when creating job.
|
|
Parameters *ImportParameters `json:"-"`
|
|
// only initialized for IMPORT INTO, used when format is detected automatically
|
|
specifiedOptions map[string]*plannercore.LoadDataOpt
|
|
// the user who executes the statement, in the form of user@host
|
|
// only initialized for IMPORT INTO
|
|
User string `json:"-"`
|
|
|
|
IsRaftKV2 bool
|
|
// total data file size in bytes.
|
|
TotalFileSize int64
|
|
// used in tests to force enable merge-step when using global sort.
|
|
ForceMergeStep bool
|
|
// see ManualRecovery in proto.ExtraParams
|
|
ManualRecovery bool
|
|
// the keyspace name when submitting this job, only for import-into
|
|
Keyspace string
|
|
}
|
|
|
|
// ASTArgs is the arguments for ast.LoadDataStmt.
|
|
// TODO: remove this struct and use the struct which can be serialized.
|
|
type ASTArgs struct {
|
|
FileLocRef ast.FileLocRefTp
|
|
ColumnsAndUserVars []*ast.ColumnNameOrUserVar
|
|
ColumnAssignments []*ast.Assignment
|
|
OnDuplicate ast.OnDuplicateKeyHandlingType
|
|
FieldsInfo *ast.FieldsClause
|
|
LinesInfo *ast.LinesClause
|
|
}
|
|
|
|
// StepSummary records the number of data involved in each step.
|
|
// The data stored might be inaccurate, such as the number of rows in encode step.
|
|
type StepSummary struct {
|
|
Bytes int64 `json:"input-bytes,omitempty"`
|
|
RowCnt int64 `json:"input-rows,omitempty"`
|
|
}
|
|
|
|
// Summary records the amount of data needed to be processed in each step of the import job.
|
|
// And this information will be saved into tidb_import_jobs table after the job is finished.
|
|
type Summary struct {
|
|
// EncodeSummary stores the bytes and rows needed to be processed in encode step.
|
|
// Same for other summaries.
|
|
EncodeSummary StepSummary `json:"encode-summary,omitempty"`
|
|
|
|
MergeSummary StepSummary `json:"merge-summary,omitempty"`
|
|
|
|
IngestSummary StepSummary `json:"ingest-summary,omitempty"`
|
|
|
|
// ImportedRows is the number of rows imported into TiKV.
|
|
// conflicted rows are excluded from this count if using global-sort.
|
|
ImportedRows int64 `json:"row-count,omitempty"`
|
|
ConflictRowCnt uint64 `json:"conflict-row-count,omitempty"`
|
|
// TooManyConflicts indicates there are too many conflicted rows that we
|
|
// cannot deduplicate during collecting its checksum, so we will skip later
|
|
// checksum step.
|
|
TooManyConflicts bool `json:"too-many-conflicts,omitempty"`
|
|
}
|
|
|
|
// LoadDataController load data controller.
|
|
// todo: need a better name
|
|
type LoadDataController struct {
|
|
*Plan
|
|
*ASTArgs
|
|
|
|
// used for sync column assignment expression generation.
|
|
colAssignMu sync.Mutex
|
|
|
|
Table table.Table
|
|
|
|
// how input field(or input column) from data file is mapped, either to a column or variable.
|
|
// if there's NO column list clause in SQL statement, then it's table's columns
|
|
// else it's user defined list.
|
|
FieldMappings []*FieldMapping
|
|
// InsertColumns the columns stated in the SQL statement to insert.
|
|
// as IMPORT INTO have 2 place to state columns, in column-vars and in set clause,
|
|
// so it's computed from both clauses:
|
|
// - append columns from column-vars to InsertColumns
|
|
// - append columns from left hand of set clause to InsertColumns
|
|
// it's similar to InsertValues.InsertColumns.
|
|
// Note: our behavior is different with mysql. such as for table t(a,b)
|
|
// - "...(a,a) set a=100" is allowed in mysql, but not in tidb
|
|
// - "...(a,b) set b=100" will set b=100 in mysql, but in tidb the set is ignored.
|
|
// - ref columns in set clause is allowed in mysql, but not in tidb
|
|
InsertColumns []*table.Column
|
|
|
|
logger *zap.Logger
|
|
dataStore storeapi.Storage
|
|
dataFiles []*mydump.SourceFileMeta
|
|
// globalSortStore is used to store sorted data when using global sort.
|
|
globalSortStore storeapi.Storage
|
|
// ExecuteNodesCnt is the count of execute nodes.
|
|
ExecuteNodesCnt int
|
|
}
|
|
|
|
func getImportantSysVars(sctx sessionctx.Context) map[string]string {
|
|
res := map[string]string{}
|
|
for k, defVal := range common.DefaultImportantVariables {
|
|
if val, ok := sctx.GetSessionVars().GetSystemVar(k); ok {
|
|
res[k] = val
|
|
} else {
|
|
res[k] = defVal
|
|
}
|
|
}
|
|
for k, defVal := range common.DefaultImportVariablesTiDB {
|
|
if val, ok := sctx.GetSessionVars().GetSystemVar(k); ok {
|
|
res[k] = val
|
|
} else {
|
|
res[k] = defVal
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
// NewPlanFromLoadDataPlan creates a import plan from LOAD DATA.
|
|
func NewPlanFromLoadDataPlan(userSctx sessionctx.Context, plan *plannercore.LoadData) (*Plan, error) {
|
|
fullTableName := common.UniqueTable(plan.Table.Schema.L, plan.Table.Name.L)
|
|
logger := log.L().With(zap.String("table", fullTableName))
|
|
charset := plan.Charset
|
|
if charset == nil {
|
|
// https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-character-set
|
|
d, err2 := userSctx.GetSessionVars().GetSessionOrGlobalSystemVar(
|
|
context.Background(), vardef.CharsetDatabase)
|
|
if err2 != nil {
|
|
logger.Error("LOAD DATA get charset failed", zap.Error(err2))
|
|
} else {
|
|
charset = &d
|
|
}
|
|
}
|
|
restrictive := userSctx.GetSessionVars().SQLMode.HasStrictMode() &&
|
|
plan.OnDuplicate != ast.OnDuplicateKeyHandlingIgnore
|
|
|
|
var ignoreLines uint64
|
|
if plan.IgnoreLines != nil {
|
|
ignoreLines = *plan.IgnoreLines
|
|
}
|
|
|
|
var (
|
|
nullDef []string
|
|
nullValueOptEnclosed = false
|
|
)
|
|
|
|
lineFieldsInfo := plannercore.NewLineFieldsInfo(plan.FieldsInfo, plan.LinesInfo)
|
|
// todo: move null defined into plannercore.LineFieldsInfo
|
|
// in load data, there maybe multiple null def, but in SELECT ... INTO OUTFILE there's only one
|
|
if plan.FieldsInfo != nil && plan.FieldsInfo.DefinedNullBy != nil {
|
|
nullDef = append(nullDef, *plan.FieldsInfo.DefinedNullBy)
|
|
nullValueOptEnclosed = plan.FieldsInfo.NullValueOptEnclosed
|
|
} else if len(lineFieldsInfo.FieldsEnclosedBy) != 0 {
|
|
nullDef = append(nullDef, "NULL")
|
|
}
|
|
if len(lineFieldsInfo.FieldsEscapedBy) != 0 {
|
|
nullDef = append(nullDef, string([]byte{lineFieldsInfo.FieldsEscapedBy[0], 'N'}))
|
|
}
|
|
|
|
return &Plan{
|
|
DBName: plan.Table.Schema.L,
|
|
DBID: plan.Table.DBInfo.ID,
|
|
|
|
Path: plan.Path,
|
|
Format: DataFormatDelimitedData,
|
|
Restrictive: restrictive,
|
|
FieldNullDef: nullDef,
|
|
NullValueOptEnclosed: nullValueOptEnclosed,
|
|
LineFieldsInfo: lineFieldsInfo,
|
|
IgnoreLines: ignoreLines,
|
|
|
|
SQLMode: userSctx.GetSessionVars().SQLMode,
|
|
Charset: charset,
|
|
ImportantSysVars: getImportantSysVars(userSctx),
|
|
|
|
DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(),
|
|
DataSourceType: DataSourceTypeFile,
|
|
}, nil
|
|
}
|
|
|
|
// NewImportPlan creates a new import into plan.
|
|
func NewImportPlan(ctx context.Context, userSctx sessionctx.Context, plan *plannercore.ImportInto, tbl table.Table) (*Plan, error) {
|
|
failpoint.InjectCall("NewImportPlan", plan)
|
|
var format string
|
|
if plan.Format != nil {
|
|
format = strings.ToLower(*plan.Format)
|
|
} else {
|
|
format = DataFormatAuto
|
|
}
|
|
restrictive := userSctx.GetSessionVars().SQLMode.HasStrictMode()
|
|
lineFieldsInfo := newDefaultLineFieldsInfo()
|
|
|
|
p := &Plan{
|
|
TableInfo: tbl.Meta(),
|
|
DesiredTableInfo: tbl.Meta(),
|
|
DBName: plan.Table.Schema.L,
|
|
DBID: plan.Table.DBInfo.ID,
|
|
|
|
Path: plan.Path,
|
|
Format: format,
|
|
Restrictive: restrictive,
|
|
FieldNullDef: defaultFieldNullDef,
|
|
LineFieldsInfo: lineFieldsInfo,
|
|
|
|
Location: userSctx.GetSessionVars().Location(),
|
|
SQLMode: userSctx.GetSessionVars().SQLMode,
|
|
ImportantSysVars: getImportantSysVars(userSctx),
|
|
|
|
DistSQLScanConcurrency: userSctx.GetSessionVars().DistSQLScanConcurrency(),
|
|
InImportInto: true,
|
|
DataSourceType: getDataSourceType(plan),
|
|
User: userSctx.GetSessionVars().User.String(),
|
|
Keyspace: userSctx.GetStore().GetKeyspace(),
|
|
}
|
|
if err := p.initOptions(ctx, userSctx, plan.Options); err != nil {
|
|
return nil, err
|
|
}
|
|
if err := p.initParameters(plan); err != nil {
|
|
return nil, err
|
|
}
|
|
return p, nil
|
|
}
|
|
|
|
func newDefaultLineFieldsInfo() plannercore.LineFieldsInfo {
|
|
// those are the default values for lightning CSV format too
|
|
return plannercore.LineFieldsInfo{
|
|
FieldsTerminatedBy: `,`,
|
|
FieldsEnclosedBy: `"`,
|
|
FieldsEscapedBy: `\`,
|
|
LinesStartingBy: ``,
|
|
// csv_parser will determine it automatically(either '\r' or '\n' or '\r\n')
|
|
// But user cannot set this to empty explicitly.
|
|
LinesTerminatedBy: ``,
|
|
}
|
|
}
|
|
|
|
// ASTArgsFromPlan creates ASTArgs from plan.
|
|
func ASTArgsFromPlan(plan *plannercore.LoadData) *ASTArgs {
|
|
return &ASTArgs{
|
|
FileLocRef: plan.FileLocRef,
|
|
ColumnsAndUserVars: plan.ColumnsAndUserVars,
|
|
ColumnAssignments: plan.ColumnAssignments,
|
|
OnDuplicate: plan.OnDuplicate,
|
|
FieldsInfo: plan.FieldsInfo,
|
|
LinesInfo: plan.LinesInfo,
|
|
}
|
|
}
|
|
|
|
// ASTArgsFromImportPlan creates ASTArgs from plan.
|
|
func ASTArgsFromImportPlan(plan *plannercore.ImportInto) *ASTArgs {
|
|
// FileLocRef are not used in ImportIntoStmt, OnDuplicate not used now.
|
|
return &ASTArgs{
|
|
FileLocRef: ast.FileLocServerOrRemote,
|
|
ColumnsAndUserVars: plan.ColumnsAndUserVars,
|
|
ColumnAssignments: plan.ColumnAssignments,
|
|
OnDuplicate: ast.OnDuplicateKeyHandlingReplace,
|
|
}
|
|
}
|
|
|
|
// ASTArgsFromStmt creates ASTArgs from statement.
|
|
func ASTArgsFromStmt(stmt string) (*ASTArgs, error) {
|
|
stmtNode, err := parser.New().ParseOneStmt(stmt, "", "")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
importIntoStmt, ok := stmtNode.(*ast.ImportIntoStmt)
|
|
if !ok {
|
|
return nil, errors.Errorf("stmt %s is not import into stmt", stmt)
|
|
}
|
|
// FileLocRef are not used in ImportIntoStmt, OnDuplicate not used now.
|
|
return &ASTArgs{
|
|
FileLocRef: ast.FileLocServerOrRemote,
|
|
ColumnsAndUserVars: importIntoStmt.ColumnsAndUserVars,
|
|
ColumnAssignments: importIntoStmt.ColumnAssignments,
|
|
OnDuplicate: ast.OnDuplicateKeyHandlingReplace,
|
|
}, nil
|
|
}
|
|
|
|
// Option is used to set optional parameters for LoadDataController.
|
|
type Option func(c *LoadDataController)
|
|
|
|
// WithLogger sets the logger for LoadDataController.
|
|
func WithLogger(logger *zap.Logger) Option {
|
|
return func(c *LoadDataController) {
|
|
c.logger = logger
|
|
}
|
|
}
|
|
|
|
// NewLoadDataController create new controller.
|
|
func NewLoadDataController(plan *Plan, tbl table.Table, astArgs *ASTArgs, options ...Option) (*LoadDataController, error) {
|
|
fullTableName := tbl.Meta().Name.String()
|
|
logger := log.L().With(zap.String("table", fullTableName))
|
|
c := &LoadDataController{
|
|
Plan: plan,
|
|
ASTArgs: astArgs,
|
|
Table: tbl,
|
|
logger: logger,
|
|
ExecuteNodesCnt: 1,
|
|
}
|
|
for _, opt := range options {
|
|
opt(c)
|
|
}
|
|
|
|
if err := c.checkFieldParams(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
columnNames := c.initFieldMappings()
|
|
if err := c.initLoadColumns(columnNames); err != nil {
|
|
return nil, err
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// InitTiKVConfigs initializes some TiKV related configs.
|
|
func (e *LoadDataController) InitTiKVConfigs(ctx context.Context, sctx sessionctx.Context) error {
|
|
isRaftKV2, err := util.IsRaftKv2(ctx, sctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.Plan.IsRaftKV2 = isRaftKV2
|
|
return nil
|
|
}
|
|
|
|
func (e *LoadDataController) checkFieldParams() error {
|
|
if e.DataSourceType == DataSourceTypeFile && e.Path == "" {
|
|
return exeerrors.ErrLoadDataEmptyPath
|
|
}
|
|
if e.InImportInto {
|
|
if e.Format != DataFormatCSV && e.Format != DataFormatParquet && e.Format != DataFormatSQL && e.Format != DataFormatAuto {
|
|
return exeerrors.ErrLoadDataUnsupportedFormat.GenWithStackByArgs(e.Format)
|
|
}
|
|
} else {
|
|
if e.NullValueOptEnclosed && len(e.FieldsEnclosedBy) == 0 {
|
|
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("must specify FIELDS [OPTIONALLY] ENCLOSED BY when use NULL DEFINED BY OPTIONALLY ENCLOSED")
|
|
}
|
|
// NOTE: IMPORT INTO also don't support user set empty LinesTerminatedBy or FieldsTerminatedBy,
|
|
// but it's check in initOptions.
|
|
if len(e.LinesTerminatedBy) == 0 {
|
|
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("LINES TERMINATED BY is empty")
|
|
}
|
|
// see https://github.com/pingcap/tidb/issues/33298
|
|
if len(e.FieldsTerminatedBy) == 0 {
|
|
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("load data with empty field terminator")
|
|
}
|
|
}
|
|
if len(e.FieldsEnclosedBy) > 0 &&
|
|
(strings.HasPrefix(e.FieldsEnclosedBy, e.FieldsTerminatedBy) || strings.HasPrefix(e.FieldsTerminatedBy, e.FieldsEnclosedBy)) {
|
|
return exeerrors.ErrLoadDataWrongFormatConfig.GenWithStackByArgs("FIELDS ENCLOSED BY and TERMINATED BY must not be prefix of each other")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Plan) initDefaultOptions(ctx context.Context, targetNodeCPUCnt int, store tidbkv.Storage) {
|
|
var threadCnt int
|
|
threadCnt = int(math.Max(1, float64(targetNodeCPUCnt)*0.5))
|
|
if p.DataSourceType == DataSourceTypeQuery {
|
|
threadCnt = 2
|
|
}
|
|
p.Checksum = config.OpLevelRequired
|
|
p.ThreadCnt = threadCnt
|
|
p.MaxWriteSpeed = unlimitedWriteSpeed
|
|
p.SplitFile = false
|
|
p.MaxRecordedErrors = 100
|
|
p.Detached = false
|
|
p.DisableTiKVImportMode = false
|
|
p.MaxEngineSize = getDefMaxEngineSize()
|
|
p.CloudStorageURI = handle.GetCloudStorageURI(ctx, store)
|
|
|
|
v := defaultCharacterSet
|
|
p.Charset = &v
|
|
}
|
|
|
|
func getDefMaxEngineSize() config.ByteSize {
|
|
if kerneltype.IsNextGen() {
|
|
return config.DefaultBatchSize
|
|
}
|
|
return config.ByteSize(defaultMaxEngineSize)
|
|
}
|
|
|
|
func (p *Plan) initOptions(ctx context.Context, seCtx sessionctx.Context, options []*plannercore.LoadDataOpt) error {
|
|
targetNodeCPUCnt, err := GetTargetNodeCPUCnt(ctx, p.DataSourceType, p.Path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
p.initDefaultOptions(ctx, targetNodeCPUCnt, seCtx.GetStore())
|
|
|
|
specifiedOptions := map[string]*plannercore.LoadDataOpt{}
|
|
for _, opt := range options {
|
|
hasValue, ok := supportedOptions[opt.Name]
|
|
if !ok {
|
|
return exeerrors.ErrUnknownOption.FastGenByArgs(opt.Name)
|
|
}
|
|
if hasValue && opt.Value == nil || !hasValue && opt.Value != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if _, ok = specifiedOptions[opt.Name]; ok {
|
|
return exeerrors.ErrDuplicateOption.FastGenByArgs(opt.Name)
|
|
}
|
|
specifiedOptions[opt.Name] = opt
|
|
}
|
|
p.specifiedOptions = specifiedOptions
|
|
|
|
if kerneltype.IsNextGen() && sem.IsEnabled() {
|
|
if p.DataSourceType == DataSourceTypeQuery {
|
|
return plannererrors.ErrNotSupportedWithSem.GenWithStackByArgs("IMPORT INTO from select")
|
|
}
|
|
// we put the check here, not in planner, to make sure the cloud_storage_uri
|
|
// won't change in between.
|
|
if p.IsLocalSort() {
|
|
return plannererrors.ErrNotSupportedWithSem.GenWithStackByArgs("IMPORT INTO with local sort")
|
|
}
|
|
for k := range disallowedOptionsOfNextGen {
|
|
if _, ok := specifiedOptions[k]; ok {
|
|
return exeerrors.ErrLoadDataUnsupportedOption.GenWithStackByArgs(k, "nextgen kernel")
|
|
}
|
|
}
|
|
}
|
|
|
|
if sem.IsEnabled() {
|
|
for k := range disallowedOptionsForSEM {
|
|
if _, ok := specifiedOptions[k]; ok {
|
|
return exeerrors.ErrLoadDataUnsupportedOption.GenWithStackByArgs(k, "SEM enabled")
|
|
}
|
|
}
|
|
}
|
|
|
|
// DataFormatAuto means format is unspecified from stmt,
|
|
// will validate below CSV options when init data files.
|
|
if p.Format != DataFormatCSV && p.Format != DataFormatAuto {
|
|
for k := range csvOnlyOptions {
|
|
if _, ok := specifiedOptions[k]; ok {
|
|
return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(k, "non-CSV format")
|
|
}
|
|
}
|
|
}
|
|
if p.DataSourceType == DataSourceTypeQuery {
|
|
for k := range specifiedOptions {
|
|
if _, ok := allowedOptionsOfImportFromQuery[k]; !ok {
|
|
return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(k, "import from query")
|
|
}
|
|
}
|
|
}
|
|
|
|
optAsString := func(opt *plannercore.LoadDataOpt) (string, error) {
|
|
if opt.Value.GetType(seCtx.GetExprCtx().GetEvalCtx()).GetType() != mysql.TypeVarString {
|
|
return "", exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
val, isNull, err2 := opt.Value.EvalString(seCtx.GetExprCtx().GetEvalCtx(), chunk.Row{})
|
|
if err2 != nil || isNull {
|
|
return "", exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
return val, nil
|
|
}
|
|
optAsInt64 := func(opt *plannercore.LoadDataOpt) (int64, error) {
|
|
// current parser takes integer and bool as mysql.TypeLonglong
|
|
if opt.Value.GetType(seCtx.GetExprCtx().GetEvalCtx()).GetType() != mysql.TypeLonglong || mysql.HasIsBooleanFlag(opt.Value.GetType(seCtx.GetExprCtx().GetEvalCtx()).GetFlag()) {
|
|
return 0, exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
val, isNull, err2 := opt.Value.EvalInt(seCtx.GetExprCtx().GetEvalCtx(), chunk.Row{})
|
|
if err2 != nil || isNull {
|
|
return 0, exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
return val, nil
|
|
}
|
|
if opt, ok := specifiedOptions[characterSetOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil || v == "" {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
_, err = config.ParseCharset(v)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.Charset = &v
|
|
}
|
|
if opt, ok := specifiedOptions[fieldsTerminatedByOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil || v == "" {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.FieldsTerminatedBy = v
|
|
}
|
|
if opt, ok := specifiedOptions[fieldsEnclosedByOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil || len(v) > 1 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.FieldsEnclosedBy = v
|
|
}
|
|
if opt, ok := specifiedOptions[fieldsEscapedByOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil || len(v) > 1 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.FieldsEscapedBy = v
|
|
}
|
|
if opt, ok := specifiedOptions[fieldsDefinedNullByOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.FieldNullDef = []string{v}
|
|
}
|
|
if opt, ok := specifiedOptions[linesTerminatedByOption]; ok {
|
|
v, err := optAsString(opt)
|
|
// cannot set terminator to empty string explicitly
|
|
if err != nil || v == "" {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.LinesTerminatedBy = v
|
|
}
|
|
if opt, ok := specifiedOptions[skipRowsOption]; ok {
|
|
vInt, err := optAsInt64(opt)
|
|
if err != nil || vInt < 0 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.IgnoreLines = uint64(vInt)
|
|
}
|
|
if _, ok := specifiedOptions[splitFileOption]; ok {
|
|
p.SplitFile = true
|
|
}
|
|
if opt, ok := specifiedOptions[diskQuotaOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = p.DiskQuota.UnmarshalText([]byte(v)); err != nil || p.DiskQuota <= 0 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[threadOption]; ok {
|
|
vInt, err := optAsInt64(opt)
|
|
if err != nil || vInt <= 0 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.ThreadCnt = int(vInt)
|
|
}
|
|
if opt, ok := specifiedOptions[maxWriteSpeedOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = p.MaxWriteSpeed.UnmarshalText([]byte(v)); err != nil || p.MaxWriteSpeed < 0 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[checksumTableOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = p.Checksum.FromStringValue(v); err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if opt, ok := specifiedOptions[groupKeyOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil || v == "" || naming.CheckWithMaxLen(v, 256) != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.GroupKey = v
|
|
}
|
|
if opt, ok := specifiedOptions[recordErrorsOption]; ok {
|
|
vInt, err := optAsInt64(opt)
|
|
if err != nil || vInt < -1 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
p.MaxRecordedErrors = vInt
|
|
}
|
|
if _, ok := specifiedOptions[detachedOption]; ok {
|
|
p.Detached = true
|
|
}
|
|
if _, ok := specifiedOptions[disableTiKVImportModeOption]; ok {
|
|
p.DisableTiKVImportMode = true
|
|
}
|
|
if opt, ok := specifiedOptions[cloudStorageURIOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
// set cloud storage uri to empty string to force uses local sort when
|
|
// the global variable is set.
|
|
if v != "" {
|
|
b, err := objstore.ParseBackend(v, nil)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
// only support s3 and gcs now.
|
|
if b.GetS3() == nil && b.GetGcs() == nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
p.CloudStorageURI = v
|
|
}
|
|
if opt, ok := specifiedOptions[maxEngineSizeOption]; ok {
|
|
v, err := optAsString(opt)
|
|
if err != nil {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
if err = p.MaxEngineSize.UnmarshalText([]byte(v)); err != nil || p.MaxEngineSize < 0 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs(opt.Name)
|
|
}
|
|
}
|
|
if _, ok := specifiedOptions[disablePrecheckOption]; ok {
|
|
p.DisablePrecheck = true
|
|
}
|
|
if _, ok := specifiedOptions[forceMergeStep]; ok {
|
|
p.ForceMergeStep = true
|
|
}
|
|
if _, ok := specifiedOptions[manualRecoveryOption]; ok {
|
|
p.ManualRecovery = true
|
|
}
|
|
|
|
if kerneltype.IsClassic() {
|
|
if sv, ok := seCtx.GetSessionVars().GetSystemVar(vardef.TiDBMaxDistTaskNodes); ok {
|
|
p.MaxNodeCnt = variable.TidbOptInt(sv, 0)
|
|
if p.MaxNodeCnt == -1 { // -1 means calculate automatically
|
|
p.MaxNodeCnt = scheduler.CalcMaxNodeCountByStoresNum(ctx, seCtx.GetStore())
|
|
}
|
|
}
|
|
}
|
|
|
|
// when split-file is set, data file will be split into chunks of 256 MiB.
|
|
// skip_rows should be 0 or 1, we add this restriction to simplify skip_rows
|
|
// logic, so we only need to skip on the first chunk for each data file.
|
|
// CSV parser limit each row size to LargestEntryLimit(120M), the first row
|
|
// will NOT cross file chunk.
|
|
if p.SplitFile && p.IgnoreLines > 1 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("skip_rows, should be <= 1 when split-file is enabled")
|
|
}
|
|
|
|
if p.SplitFile && len(p.LinesTerminatedBy) == 0 {
|
|
return exeerrors.ErrInvalidOptionVal.FastGenByArgs("lines_terminated_by, should not be empty when use split_file")
|
|
}
|
|
|
|
p.adjustOptions(targetNodeCPUCnt)
|
|
return nil
|
|
}
|
|
|
|
func (p *Plan) adjustOptions(targetNodeCPUCnt int) {
|
|
limit := targetNodeCPUCnt
|
|
if p.DataSourceType == DataSourceTypeQuery {
|
|
// for query, row is produced using 1 thread, the max cpu used is much
|
|
// lower than import from file, so we set limit to 2*targetNodeCPUCnt.
|
|
// TODO: adjust after spec is ready.
|
|
limit *= 2
|
|
}
|
|
// max value is cpu-count
|
|
if p.ThreadCnt > limit {
|
|
log.L().Info("adjust IMPORT INTO thread count",
|
|
zap.Int("before", p.ThreadCnt), zap.Int("after", limit))
|
|
p.ThreadCnt = limit
|
|
}
|
|
if p.IsGlobalSort() {
|
|
p.DisableTiKVImportMode = true
|
|
}
|
|
}
|
|
|
|
func (p *Plan) initParameters(plan *plannercore.ImportInto) error {
|
|
redactURL := ast.RedactURL(p.Path)
|
|
var columnsAndVars, setClause string
|
|
var sb strings.Builder
|
|
formatCtx := pformat.NewRestoreCtx(pformat.DefaultRestoreFlags, &sb)
|
|
if len(plan.ColumnsAndUserVars) > 0 {
|
|
sb.WriteString("(")
|
|
for i, col := range plan.ColumnsAndUserVars {
|
|
if i > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
_ = col.Restore(formatCtx)
|
|
}
|
|
sb.WriteString(")")
|
|
columnsAndVars = sb.String()
|
|
}
|
|
if len(plan.ColumnAssignments) > 0 {
|
|
sb.Reset()
|
|
for i, assign := range plan.ColumnAssignments {
|
|
if i > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
_ = assign.Restore(formatCtx)
|
|
}
|
|
setClause = sb.String()
|
|
}
|
|
optionMap := make(map[string]any, len(plan.Options))
|
|
for _, opt := range plan.Options {
|
|
if opt.Value != nil {
|
|
// The option attached to the import statement here are all
|
|
// parameters entered by the user. TiDB will process the
|
|
// parameters entered by the user as constant. so we can
|
|
// directly convert it to constant.
|
|
cons := opt.Value.(*expression.Constant)
|
|
val := fmt.Sprintf("%v", cons.Value.GetValue())
|
|
if opt.Name == cloudStorageURIOption {
|
|
val = ast.RedactURL(val)
|
|
}
|
|
optionMap[opt.Name] = val
|
|
} else {
|
|
optionMap[opt.Name] = nil
|
|
}
|
|
}
|
|
p.Parameters = &ImportParameters{
|
|
ColumnsAndVars: columnsAndVars,
|
|
SetClause: setClause,
|
|
FileLocation: redactURL,
|
|
Format: p.Format,
|
|
Options: optionMap,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *LoadDataController) tableVisCols2FieldMappings() ([]*FieldMapping, []string) {
|
|
tableCols := e.Table.VisibleCols()
|
|
mappings := make([]*FieldMapping, 0, len(tableCols))
|
|
names := make([]string, 0, len(tableCols))
|
|
for _, v := range tableCols {
|
|
// Data for generated column is generated from the other rows rather than from the parsed data.
|
|
fieldMapping := &FieldMapping{
|
|
Column: v,
|
|
}
|
|
mappings = append(mappings, fieldMapping)
|
|
names = append(names, v.Name.O)
|
|
}
|
|
return mappings, names
|
|
}
|
|
|
|
// initFieldMappings make a field mapping slice to implicitly map input field to table column or user defined variable
|
|
// the slice's order is the same as the order of the input fields.
|
|
// Returns a slice of same ordered column names without user defined variable names.
|
|
func (e *LoadDataController) initFieldMappings() []string {
|
|
columns := make([]string, 0, len(e.ColumnsAndUserVars)+len(e.ColumnAssignments))
|
|
tableCols := e.Table.VisibleCols()
|
|
|
|
if len(e.ColumnsAndUserVars) == 0 {
|
|
e.FieldMappings, columns = e.tableVisCols2FieldMappings()
|
|
|
|
return columns
|
|
}
|
|
|
|
var column *table.Column
|
|
|
|
for _, v := range e.ColumnsAndUserVars {
|
|
if v.ColumnName != nil {
|
|
column = table.FindCol(tableCols, v.ColumnName.Name.O)
|
|
columns = append(columns, v.ColumnName.Name.O)
|
|
} else {
|
|
column = nil
|
|
}
|
|
|
|
fieldMapping := &FieldMapping{
|
|
Column: column,
|
|
UserVar: v.UserVar,
|
|
}
|
|
e.FieldMappings = append(e.FieldMappings, fieldMapping)
|
|
}
|
|
|
|
return columns
|
|
}
|
|
|
|
// initLoadColumns sets columns which the input fields loaded to.
|
|
func (e *LoadDataController) initLoadColumns(columnNames []string) error {
|
|
var cols []*table.Column
|
|
var missingColName string
|
|
var err error
|
|
tableCols := e.Table.VisibleCols()
|
|
|
|
if len(columnNames) != len(tableCols) {
|
|
for _, v := range e.ColumnAssignments {
|
|
columnNames = append(columnNames, v.Column.Name.O)
|
|
}
|
|
}
|
|
|
|
cols, missingColName = table.FindCols(tableCols, columnNames, e.Table.Meta().PKIsHandle)
|
|
if missingColName != "" {
|
|
return dbterror.ErrBadField.GenWithStackByArgs(missingColName, "field list")
|
|
}
|
|
|
|
e.InsertColumns = append(e.InsertColumns, cols...)
|
|
|
|
// e.InsertColumns is appended according to the original tables' column sequence.
|
|
// We have to reorder it to follow the use-specified column order which is shown in the columnNames.
|
|
if err = e.reorderColumns(columnNames); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check column whether is specified only once.
|
|
err = table.CheckOnce(cols)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reorderColumns reorder the e.InsertColumns according to the order of columnNames
|
|
// Note: We must ensure there must be one-to-one mapping between e.InsertColumns and columnNames in terms of column name.
|
|
func (e *LoadDataController) reorderColumns(columnNames []string) error {
|
|
cols := e.InsertColumns
|
|
|
|
if len(cols) != len(columnNames) {
|
|
return exeerrors.ErrColumnsNotMatched
|
|
}
|
|
|
|
reorderedColumns := make([]*table.Column, len(cols))
|
|
|
|
if columnNames == nil {
|
|
return nil
|
|
}
|
|
|
|
mapping := make(map[string]int)
|
|
for idx, colName := range columnNames {
|
|
mapping[strings.ToLower(colName)] = idx
|
|
}
|
|
|
|
for _, col := range cols {
|
|
idx := mapping[col.Name.L]
|
|
reorderedColumns[idx] = col
|
|
}
|
|
|
|
e.InsertColumns = reorderedColumns
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetFieldCount get field count.
|
|
func (e *LoadDataController) GetFieldCount() int {
|
|
return len(e.FieldMappings)
|
|
}
|
|
|
|
// GenerateCSVConfig generates a CSV config for parser from LoadDataWorker.
|
|
func (e *LoadDataController) GenerateCSVConfig() *config.CSVConfig {
|
|
csvConfig := &config.CSVConfig{
|
|
FieldsTerminatedBy: e.FieldsTerminatedBy,
|
|
// ignore optionally enclosed
|
|
FieldsEnclosedBy: e.FieldsEnclosedBy,
|
|
LinesTerminatedBy: e.LinesTerminatedBy,
|
|
NotNull: false,
|
|
FieldNullDefinedBy: e.FieldNullDef,
|
|
Header: false,
|
|
TrimLastEmptyField: false,
|
|
FieldsEscapedBy: e.FieldsEscapedBy,
|
|
LinesStartingBy: e.LinesStartingBy,
|
|
}
|
|
if !e.InImportInto {
|
|
// for load data
|
|
csvConfig.AllowEmptyLine = true
|
|
csvConfig.QuotedNullIsText = !e.NullValueOptEnclosed
|
|
csvConfig.UnescapedQuote = true
|
|
}
|
|
return csvConfig
|
|
}
|
|
|
|
// InitDataStore initializes the data store.
|
|
func (e *LoadDataController) InitDataStore(ctx context.Context) error {
|
|
u, err2 := objstore.ParseRawURL(e.Path)
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
err2.Error())
|
|
}
|
|
|
|
if objstore.IsLocal(u) {
|
|
u.Path = filepath.Dir(e.Path)
|
|
} else {
|
|
u.Path = ""
|
|
}
|
|
s, err := initExternalStore(ctx, u, plannercore.ImportIntoDataSource)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.dataStore = s
|
|
|
|
if e.IsGlobalSort() {
|
|
store, err3 := GetSortStore(ctx, e.Plan.CloudStorageURI)
|
|
if err3 != nil {
|
|
return err3
|
|
}
|
|
e.globalSortStore = store
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Close closes all the resources.
|
|
func (e *LoadDataController) Close() {
|
|
if e.dataStore != nil {
|
|
e.dataStore.Close()
|
|
}
|
|
if e.globalSortStore != nil {
|
|
e.globalSortStore.Close()
|
|
}
|
|
}
|
|
|
|
// GetSortStore gets the sort store.
|
|
func GetSortStore(ctx context.Context, url string) (storeapi.Storage, error) {
|
|
u, err := objstore.ParseRawURL(url)
|
|
target := "cloud storage"
|
|
if err != nil {
|
|
return nil, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, err.Error())
|
|
}
|
|
return initExternalStore(ctx, u, target)
|
|
}
|
|
|
|
func initExternalStore(ctx context.Context, u *url.URL, target string) (storeapi.Storage, error) {
|
|
b, err2 := objstore.ParseBackendFromURL(u, nil)
|
|
if err2 != nil {
|
|
return nil, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(target, errors.GetErrStackMsg(err2))
|
|
}
|
|
|
|
s, err := objstore.NewWithDefaultOpt(ctx, b)
|
|
if err != nil {
|
|
return nil, exeerrors.ErrLoadDataCantAccess.GenWithStackByArgs(target, errors.GetErrStackMsg(err))
|
|
}
|
|
return s, nil
|
|
}
|
|
|
|
func estimateCompressionRatio(
|
|
ctx context.Context,
|
|
filePath string,
|
|
fileSize int64,
|
|
tp mydump.SourceType,
|
|
store storeapi.Storage,
|
|
) (float64, error) {
|
|
if tp != mydump.SourceTypeParquet {
|
|
return 1.0, nil
|
|
}
|
|
failpoint.Inject("skipEstimateCompressionForParquet", func(val failpoint.Value) {
|
|
if v, ok := val.(bool); ok && v {
|
|
failpoint.Return(2.0, nil)
|
|
}
|
|
})
|
|
rows, rowSize, err := mydump.SampleStatisticsFromParquet(ctx, filePath, store)
|
|
if err != nil {
|
|
return 1.0, err
|
|
}
|
|
// No row in the file, use 2.0 as default compression ratio.
|
|
if rowSize == 0 || rows == 0 {
|
|
return 2.0, nil
|
|
}
|
|
|
|
compressionRatio := (rowSize * float64(rows)) / float64(fileSize)
|
|
return compressionRatio, nil
|
|
}
|
|
|
|
// maxSampledCompressedFiles indicates the max number of files we used to sample
|
|
// compression ratio for each compression type. Consider the extreme case that
|
|
// user data contains all 3 compression types. Then we need to sample about 1,500
|
|
// files. Suppose each file costs 0.5 second (for example, cross region access),
|
|
// we still can finish in one minute with 16 concurrency.
|
|
const maxSampledCompressedFiles = 512
|
|
|
|
// compressionEstimator estimates compression ratio for different compression types.
|
|
// It uses harmonic mean to get the average compression ratio.
|
|
type compressionEstimator struct {
|
|
mu sync.Mutex
|
|
records map[mydump.Compression][]float64
|
|
ratio sync.Map
|
|
}
|
|
|
|
func newCompressionRecorder() *compressionEstimator {
|
|
return &compressionEstimator{
|
|
records: make(map[mydump.Compression][]float64),
|
|
}
|
|
}
|
|
|
|
func getHarmonicMean(rs []float64) float64 {
|
|
if len(rs) == 0 {
|
|
return 1.0
|
|
}
|
|
var (
|
|
sumInverse float64
|
|
count int
|
|
)
|
|
for _, r := range rs {
|
|
if r > 0 {
|
|
sumInverse += 1.0 / r
|
|
count++
|
|
}
|
|
}
|
|
|
|
if count == 0 {
|
|
return 1.0
|
|
}
|
|
return float64(count) / sumInverse
|
|
}
|
|
|
|
func (r *compressionEstimator) estimate(
|
|
ctx context.Context,
|
|
fileMeta mydump.SourceFileMeta,
|
|
store storeapi.Storage,
|
|
) float64 {
|
|
compressTp := mydump.ParseCompressionOnFileExtension(fileMeta.Path)
|
|
if compressTp == mydump.CompressionNone {
|
|
return 1.0
|
|
}
|
|
if v, ok := r.ratio.Load(compressTp); ok {
|
|
return v.(float64)
|
|
}
|
|
|
|
compressRatio, err := mydump.SampleFileCompressRatio(ctx, fileMeta, store)
|
|
if err != nil {
|
|
logutil.Logger(ctx).Error("fail to calculate data file compress ratio",
|
|
zap.String("category", "loader"),
|
|
zap.String("path", fileMeta.Path),
|
|
zap.Stringer("type", fileMeta.Type), zap.Error(err),
|
|
)
|
|
return 1.0
|
|
}
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if _, ok := r.ratio.Load(compressTp); ok {
|
|
return compressRatio
|
|
}
|
|
|
|
if r.records[compressTp] == nil {
|
|
r.records[compressTp] = make([]float64, 0, 256)
|
|
}
|
|
if len(r.records[compressTp]) < maxSampledCompressedFiles {
|
|
r.records[compressTp] = append(r.records[compressTp], compressRatio)
|
|
}
|
|
if len(r.records[compressTp]) >= maxSampledCompressedFiles {
|
|
// Using harmonic mean can better handle outlier values.
|
|
compressRatio = getHarmonicMean(r.records[compressTp])
|
|
r.ratio.Store(compressTp, compressRatio)
|
|
}
|
|
return compressRatio
|
|
}
|
|
|
|
// InitDataFiles initializes the data store and files.
|
|
// it will call InitDataStore internally.
|
|
func (e *LoadDataController) InitDataFiles(ctx context.Context) error {
|
|
u, err2 := objstore.ParseRawURL(e.Path)
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
err2.Error())
|
|
}
|
|
|
|
var fileNameKey string
|
|
if objstore.IsLocal(u) {
|
|
// LOAD DATA don't support server file.
|
|
if !e.InImportInto {
|
|
return exeerrors.ErrLoadDataFromServerDisk.GenWithStackByArgs(e.Path)
|
|
}
|
|
|
|
if !filepath.IsAbs(e.Path) {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
"file location should be absolute path when import from server disk")
|
|
}
|
|
// we add this check for security, we don't want user import any sensitive system files,
|
|
// most of which is readable text file and don't have a suffix, such as /etc/passwd
|
|
if !slices.Contains(supportedSuffixForServerDisk, strings.ToLower(filepath.Ext(e.Path))) {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
"the file suffix is not supported when import from server disk")
|
|
}
|
|
dir := filepath.Dir(e.Path)
|
|
_, err := os.Stat(dir)
|
|
if err != nil {
|
|
// permission denied / file not exist error, etc.
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
err.Error())
|
|
}
|
|
|
|
fileNameKey = filepath.Base(e.Path)
|
|
} else {
|
|
fileNameKey = strings.Trim(u.Path, "/")
|
|
}
|
|
// try to find pattern error in advance
|
|
_, err2 = filepath.Match(stringutil.EscapeGlobQuestionMark(fileNameKey), "")
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
"Glob pattern error: "+err2.Error())
|
|
}
|
|
|
|
if err2 = e.InitDataStore(ctx); err2 != nil {
|
|
return err2
|
|
}
|
|
|
|
s := e.dataStore
|
|
var (
|
|
totalSize int64
|
|
sourceType mydump.SourceType
|
|
// sizeExpansionRatio is the estimated size expansion for parquet format.
|
|
// For non-parquet format, it's always 1.0.
|
|
sizeExpansionRatio = 1.0
|
|
)
|
|
dataFiles := []*mydump.SourceFileMeta{}
|
|
isAutoDetectingFormat := e.Format == DataFormatAuto
|
|
// check glob pattern is present in filename.
|
|
idx := strings.IndexAny(fileNameKey, "*[")
|
|
// simple path when the path represent one file
|
|
if idx == -1 {
|
|
fileReader, err2 := s.Open(ctx, fileNameKey, nil)
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err2), "Please check the file location is correct")
|
|
}
|
|
defer func() {
|
|
terror.Log(fileReader.Close())
|
|
}()
|
|
size, err3 := fileReader.Seek(0, io.SeekEnd)
|
|
if err3 != nil {
|
|
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err3), "failed to read file size by seek")
|
|
}
|
|
e.detectAndUpdateFormat(fileNameKey)
|
|
sourceType = e.getSourceType()
|
|
compressionRatio, err := estimateCompressionRatio(ctx, fileNameKey, size, sourceType, s)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
compressTp := mydump.ParseCompressionOnFileExtension(fileNameKey)
|
|
fileMeta := mydump.SourceFileMeta{
|
|
Path: fileNameKey,
|
|
FileSize: size,
|
|
Compression: compressTp,
|
|
Type: sourceType,
|
|
}
|
|
fileMeta.RealSize = mydump.EstimateRealSizeForFile(ctx, fileMeta, s)
|
|
fileMeta.RealSize = int64(float64(fileMeta.RealSize) * compressionRatio)
|
|
dataFiles = append(dataFiles, &fileMeta)
|
|
totalSize = size
|
|
} else {
|
|
var commonPrefix string
|
|
if !objstore.IsLocal(u) {
|
|
// for local directory, we're walking the parent directory,
|
|
// so we don't have a common prefix as cloud storage do.
|
|
commonPrefix = fileNameKey[:idx]
|
|
}
|
|
// when import from server disk, all entries in parent directory should have READ
|
|
// access, else walkDir will fail
|
|
// we only support '*', in order to reuse glob library manually escape the path
|
|
escapedPath := stringutil.EscapeGlobQuestionMark(fileNameKey)
|
|
|
|
allFiles := make([]mydump.RawFile, 0, 16)
|
|
if err := s.WalkDir(ctx, &storeapi.WalkOption{ObjPrefix: commonPrefix, SkipSubDir: true},
|
|
func(remotePath string, size int64) error {
|
|
allFiles = append(allFiles, mydump.RawFile{Path: remotePath, Size: size})
|
|
return nil
|
|
}); err != nil {
|
|
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err), "failed to walk dir")
|
|
}
|
|
|
|
var err error
|
|
var processedFiles []*mydump.SourceFileMeta
|
|
var once sync.Once
|
|
|
|
ce := newCompressionRecorder()
|
|
|
|
if processedFiles, err = mydump.ParallelProcess(ctx, allFiles, e.ThreadCnt*2,
|
|
func(ctx context.Context, f mydump.RawFile) (*mydump.SourceFileMeta, error) {
|
|
// we have checked in LoadDataExec.Next
|
|
//nolint: errcheck
|
|
match, _ := filepath.Match(escapedPath, f.Path)
|
|
if !match {
|
|
return nil, nil
|
|
}
|
|
path, size := f.Path, f.Size
|
|
// pick arbitrary one file to detect the format.
|
|
var err2 error
|
|
once.Do(func() {
|
|
e.detectAndUpdateFormat(path)
|
|
sourceType = e.getSourceType()
|
|
sizeExpansionRatio, err2 = estimateCompressionRatio(ctx, path, size, sourceType, s)
|
|
})
|
|
if err2 != nil {
|
|
return nil, err2
|
|
}
|
|
compressTp := mydump.ParseCompressionOnFileExtension(path)
|
|
fileMeta := mydump.SourceFileMeta{
|
|
Path: path,
|
|
FileSize: size,
|
|
Compression: compressTp,
|
|
Type: sourceType,
|
|
}
|
|
fileMeta.RealSize = int64(ce.estimate(ctx, fileMeta, s) * float64(fileMeta.FileSize))
|
|
fileMeta.RealSize = int64(float64(fileMeta.RealSize) * sizeExpansionRatio)
|
|
return &fileMeta, nil
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
// filter unmatch files
|
|
for _, f := range processedFiles {
|
|
if f != nil {
|
|
dataFiles = append(dataFiles, f)
|
|
totalSize += f.FileSize
|
|
}
|
|
}
|
|
}
|
|
if e.InImportInto && isAutoDetectingFormat && e.Format != DataFormatCSV {
|
|
if err2 = e.checkNonCSVFormatOptions(); err2 != nil {
|
|
return err2
|
|
}
|
|
}
|
|
|
|
e.dataFiles = dataFiles
|
|
e.TotalFileSize = totalSize
|
|
|
|
return nil
|
|
}
|
|
|
|
// CalResourceParams calculates resource related parameters according to the total
|
|
// file size and target node cpu count.
|
|
func (e *LoadDataController) CalResourceParams(ctx context.Context, ksCodec []byte) error {
|
|
start := time.Now()
|
|
targetNodeCPUCnt, err := handle.GetCPUCountOfNode(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
factors, err := handle.GetScheduleTuneFactors(ctx, e.Keyspace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
totalSize := e.TotalFileSize
|
|
failpoint.InjectCall("mockImportDataSize", &totalSize)
|
|
numOfIndexGenKV := GetNumOfIndexGenKV(e.TableInfo)
|
|
var indexSizeRatio float64
|
|
if numOfIndexGenKV > 0 {
|
|
indexSizeRatio, err = e.sampleIndexSizeRatio(ctx, ksCodec)
|
|
if err != nil {
|
|
e.logger.Warn("meet error when sampling index size ratio", zap.Error(err))
|
|
}
|
|
}
|
|
cal := scheduler.NewRCCalc(totalSize, targetNodeCPUCnt, indexSizeRatio, factors)
|
|
e.ThreadCnt = cal.CalcRequiredSlots()
|
|
e.MaxNodeCnt = cal.CalcMaxNodeCountForImportInto()
|
|
e.DistSQLScanConcurrency = scheduler.CalcDistSQLConcurrency(e.ThreadCnt, e.MaxNodeCnt, targetNodeCPUCnt)
|
|
e.logger.Info("auto calculate resource related params",
|
|
zap.Int("thread", e.ThreadCnt),
|
|
zap.Int("maxNode", e.MaxNodeCnt),
|
|
zap.Int("distsqlScanConcurrency", e.DistSQLScanConcurrency),
|
|
zap.Int("targetNodeCPU", targetNodeCPUCnt),
|
|
zap.String("totalFileSize", units.BytesSize(float64(totalSize))),
|
|
zap.Int("fileCount", len(e.dataFiles)),
|
|
zap.Int("numOfIndexGenKV", numOfIndexGenKV),
|
|
zap.Float64("indexSizeRatio", indexSizeRatio),
|
|
zap.Float64("amplifyFactor", factors.AmplifyFactor),
|
|
zap.Duration("costTime", time.Since(start)),
|
|
)
|
|
return nil
|
|
}
|
|
|
|
// update format of the validated file by its extension.
|
|
func (e *LoadDataController) detectAndUpdateFormat(path string) {
|
|
if e.Format == DataFormatAuto {
|
|
e.Format = parseFileType(path)
|
|
e.logger.Info("detect and update import plan format based on file extension",
|
|
zap.String("file", path), zap.String("detected format", e.Format))
|
|
e.Parameters.Format = e.Format
|
|
}
|
|
}
|
|
|
|
func parseFileType(path string) string {
|
|
path = strings.ToLower(path)
|
|
ext := filepath.Ext(path)
|
|
// avoid duplicate compress extension
|
|
if ext == ".gz" || ext == ".gzip" || ext == ".zstd" || ext == ".zst" || ext == ".snappy" {
|
|
path = strings.TrimSuffix(path, ext)
|
|
ext = filepath.Ext(path)
|
|
}
|
|
switch ext {
|
|
case ".sql":
|
|
return DataFormatSQL
|
|
case ".parquet":
|
|
return DataFormatParquet
|
|
default:
|
|
// if file do not contain file extension, use ".csv" as default format
|
|
return DataFormatCSV
|
|
}
|
|
}
|
|
|
|
func (e *LoadDataController) getSourceType() mydump.SourceType {
|
|
switch e.Format {
|
|
case DataFormatParquet:
|
|
return mydump.SourceTypeParquet
|
|
case DataFormatDelimitedData, DataFormatCSV:
|
|
return mydump.SourceTypeCSV
|
|
default:
|
|
// DataFormatSQL
|
|
return mydump.SourceTypeSQL
|
|
}
|
|
}
|
|
|
|
// GetLoadDataReaderInfos returns the LoadDataReaderInfo for each data file.
|
|
func (e *LoadDataController) GetLoadDataReaderInfos() []LoadDataReaderInfo {
|
|
result := make([]LoadDataReaderInfo, 0, len(e.dataFiles))
|
|
for i := range e.dataFiles {
|
|
f := e.dataFiles[i]
|
|
result = append(result, LoadDataReaderInfo{
|
|
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
|
|
fileReader, err2 := mydump.OpenReader(ctx, f, e.dataStore, compressedio.DecompressConfig{})
|
|
if err2 != nil {
|
|
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(errors.GetErrStackMsg(err2), "Please check the INFILE path is correct")
|
|
}
|
|
return fileReader, nil
|
|
},
|
|
Remote: f,
|
|
})
|
|
}
|
|
return result
|
|
}
|
|
|
|
// GetParser returns a parser for the data file.
|
|
func (e *LoadDataController) GetParser(
|
|
ctx context.Context,
|
|
dataFileInfo LoadDataReaderInfo,
|
|
) (parser mydump.Parser, err error) {
|
|
reader, err2 := dataFileInfo.Opener(ctx)
|
|
if err2 != nil {
|
|
return nil, err2
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
if err3 := reader.Close(); err3 != nil {
|
|
e.logger.Warn("failed to close reader", zap.Error(err3))
|
|
}
|
|
}
|
|
}()
|
|
switch e.Format {
|
|
case DataFormatDelimitedData, DataFormatCSV:
|
|
var charsetConvertor *mydump.CharsetConvertor
|
|
if e.Charset != nil {
|
|
charsetConvertor, err = mydump.NewCharsetConvertor(*e.Charset, string(utf8.RuneError))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
parser, err = mydump.NewCSVParser(
|
|
ctx,
|
|
e.GenerateCSVConfig(),
|
|
reader,
|
|
LoadDataReadBlockSize,
|
|
nil,
|
|
false,
|
|
charsetConvertor)
|
|
case DataFormatSQL:
|
|
parser = mydump.NewChunkParser(
|
|
ctx,
|
|
e.SQLMode,
|
|
reader,
|
|
LoadDataReadBlockSize,
|
|
nil,
|
|
)
|
|
case DataFormatParquet:
|
|
parser, err = mydump.NewParquetParser(
|
|
ctx,
|
|
e.dataStore,
|
|
reader,
|
|
dataFileInfo.Remote.Path,
|
|
dataFileInfo.Remote.ParquetMeta,
|
|
)
|
|
}
|
|
if err != nil {
|
|
return nil, exeerrors.ErrLoadDataWrongFormatConfig.GenWithStack(err.Error())
|
|
}
|
|
parser.SetLogger(litlog.Logger{Logger: logutil.Logger(ctx)})
|
|
|
|
return parser, nil
|
|
}
|
|
|
|
// HandleSkipNRows skips the first N rows of the data file.
|
|
func (e *LoadDataController) HandleSkipNRows(parser mydump.Parser) error {
|
|
// handle IGNORE N LINES
|
|
ignoreOneLineFn := parser.ReadRow
|
|
if csvParser, ok := parser.(*mydump.CSVParser); ok {
|
|
ignoreOneLineFn = func() error {
|
|
_, _, err3 := csvParser.ReadUntilTerminator()
|
|
return err3
|
|
}
|
|
}
|
|
|
|
ignoreLineCnt := e.IgnoreLines
|
|
for ignoreLineCnt > 0 {
|
|
err := ignoreOneLineFn()
|
|
if err != nil {
|
|
if errors.Cause(err) == io.EOF {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
ignoreLineCnt--
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *LoadDataController) toMyDumpFiles() []mydump.FileInfo {
|
|
tbl := filter.Table{
|
|
Schema: e.DBName,
|
|
Name: e.Table.Meta().Name.O,
|
|
}
|
|
res := []mydump.FileInfo{}
|
|
for _, f := range e.dataFiles {
|
|
res = append(res, mydump.FileInfo{
|
|
TableName: tbl,
|
|
FileMeta: *f,
|
|
})
|
|
}
|
|
return res
|
|
}
|
|
|
|
// IsLocalSort returns true if we sort data on local disk.
|
|
func (p *Plan) IsLocalSort() bool {
|
|
return p.CloudStorageURI == ""
|
|
}
|
|
|
|
// IsGlobalSort returns true if we sort data on global storage.
|
|
func (p *Plan) IsGlobalSort() bool {
|
|
return !p.IsLocalSort()
|
|
}
|
|
|
|
// non CSV format should not specify CSV only options, we check it again if the
|
|
// format is detected automatically.
|
|
func (p *Plan) checkNonCSVFormatOptions() error {
|
|
for k := range csvOnlyOptions {
|
|
if _, ok := p.specifiedOptions[k]; ok {
|
|
return exeerrors.ErrLoadDataUnsupportedOption.FastGenByArgs(k, "non-CSV format")
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CreateColAssignExprs creates the column assignment expressions using session context.
|
|
// RewriteAstExpr will write ast node in place(due to xxNode.Accept), but it doesn't change node content,
|
|
// so we sync it.
|
|
func (e *LoadDataController) CreateColAssignExprs(planCtx planctx.PlanContext) (
|
|
_ []expression.Expression,
|
|
_ []contextutil.SQLWarn,
|
|
retErr error,
|
|
) {
|
|
e.colAssignMu.Lock()
|
|
defer e.colAssignMu.Unlock()
|
|
res := make([]expression.Expression, 0, len(e.ColumnAssignments))
|
|
allWarnings := []contextutil.SQLWarn{}
|
|
for _, assign := range e.ColumnAssignments {
|
|
newExpr, err := plannerutil.RewriteAstExprWithPlanCtx(planCtx, assign.Expr, nil, nil, false)
|
|
// col assign expr warnings is static, we should generate it for each row processed.
|
|
// so we save it and clear it here.
|
|
allWarnings = append(allWarnings, planCtx.GetSessionVars().StmtCtx.GetWarnings()...)
|
|
planCtx.GetSessionVars().StmtCtx.SetWarnings(nil)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
res = append(res, newExpr)
|
|
}
|
|
return res, allWarnings, nil
|
|
}
|
|
|
|
// CreateColAssignSimpleExprs creates the column assignment expressions using `expression.BuildContext`.
|
|
// This method does not support:
|
|
// - Subquery
|
|
// - System Variables (e.g. `@@tidb_enable_async_commit`)
|
|
// - Window functions
|
|
// - Aggregate functions
|
|
// - Other special functions used in some specified queries such as `GROUPING`, `VALUES` ...
|
|
func (e *LoadDataController) CreateColAssignSimpleExprs(ctx expression.BuildContext) (_ []expression.Expression, _ []contextutil.SQLWarn, retErr error) {
|
|
e.colAssignMu.Lock()
|
|
defer e.colAssignMu.Unlock()
|
|
res := make([]expression.Expression, 0, len(e.ColumnAssignments))
|
|
var allWarnings []contextutil.SQLWarn
|
|
for _, assign := range e.ColumnAssignments {
|
|
newExpr, err := expression.BuildSimpleExpr(ctx, assign.Expr)
|
|
// col assign expr warnings is static, we should generate it for each row processed.
|
|
// so we save it and clear it here.
|
|
if ctx.GetEvalCtx().WarningCount() > 0 {
|
|
allWarnings = append(allWarnings, ctx.GetEvalCtx().TruncateWarnings(0)...)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
res = append(res, newExpr)
|
|
}
|
|
return res, allWarnings, nil
|
|
}
|
|
|
|
func (e *LoadDataController) getLocalBackendCfg(keyspace, pdAddr, dataDir string) local.BackendConfig {
|
|
backendConfig := local.BackendConfig{
|
|
PDAddr: pdAddr,
|
|
LocalStoreDir: dataDir,
|
|
MaxConnPerStore: config.DefaultRangeConcurrency,
|
|
ConnCompressType: config.CompressionNone,
|
|
WorkerConcurrency: *atomic.NewInt32(int32(e.ThreadCnt)),
|
|
KVWriteBatchSize: config.KVWriteBatchSize,
|
|
RegionSplitBatchSize: config.DefaultRegionSplitBatchSize,
|
|
RegionSplitConcurrency: runtime.GOMAXPROCS(0),
|
|
// enable after we support checkpoint
|
|
CheckpointEnabled: false,
|
|
MemTableSize: config.DefaultEngineMemCacheSize,
|
|
LocalWriterMemCacheSize: int64(config.DefaultLocalWriterMemCacheSize),
|
|
ShouldCheckTiKV: true,
|
|
DupeDetectEnabled: false,
|
|
DuplicateDetectOpt: common.DupDetectOpt{ReportErrOnDup: false},
|
|
TiKVWorkerURL: tidb.GetGlobalConfig().TiKVWorkerURL,
|
|
StoreWriteBWLimit: int(e.MaxWriteSpeed),
|
|
MaxOpenFiles: int(tidbutil.GenRLimit("table_import")),
|
|
KeyspaceName: keyspace,
|
|
PausePDSchedulerScope: config.PausePDSchedulerScopeTable,
|
|
DisableAutomaticCompactions: true,
|
|
BlockSize: config.DefaultBlockSize,
|
|
}
|
|
if e.IsRaftKV2 {
|
|
backendConfig.RaftKV2SwitchModeDuration = config.DefaultSwitchTiKVModeInterval
|
|
}
|
|
return backendConfig
|
|
}
|
|
|
|
// FullTableName return FQDN of the table.
|
|
func (e *LoadDataController) FullTableName() string {
|
|
return common.UniqueTable(e.DBName, e.Table.Meta().Name.O)
|
|
}
|
|
|
|
func getDataSourceType(p *plannercore.ImportInto) DataSourceType {
|
|
if p.SelectPlan != nil {
|
|
return DataSourceTypeQuery
|
|
}
|
|
return DataSourceTypeFile
|
|
}
|
|
|
|
// GetTargetNodeCPUCnt get cpu count of target node where the import into job will be executed.
|
|
// target node is current node if it's server-disk import, import from query or disttask is disabled,
|
|
// else it's the node managed by disttask.
|
|
// exported for testing.
|
|
func GetTargetNodeCPUCnt(ctx context.Context, sourceType DataSourceType, path string) (int, error) {
|
|
if sourceType == DataSourceTypeQuery {
|
|
return cpu.GetCPUCount(), nil
|
|
}
|
|
|
|
u, err2 := objstore.ParseRawURL(path)
|
|
if err2 != nil {
|
|
return 0, exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(plannercore.ImportIntoDataSource,
|
|
err2.Error())
|
|
}
|
|
|
|
serverDiskImport := objstore.IsLocal(u)
|
|
if serverDiskImport || !vardef.EnableDistTask.Load() {
|
|
return cpu.GetCPUCount(), nil
|
|
}
|
|
return handle.GetCPUCountOfNode(ctx)
|
|
}
|