1093 lines
30 KiB
Go
1093 lines
30 KiB
Go
// Copyright 2018 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"io"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
backup "github.com/pingcap/kvproto/pkg/brpb"
|
|
"github.com/pingcap/tidb/br/pkg/lightning/log"
|
|
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
|
|
"github.com/pingcap/tidb/br/pkg/storage"
|
|
"github.com/pingcap/tidb/executor/asyncloaddata"
|
|
"github.com/pingcap/tidb/executor/importer"
|
|
"github.com/pingcap/tidb/expression"
|
|
"github.com/pingcap/tidb/kv"
|
|
"github.com/pingcap/tidb/parser/ast"
|
|
"github.com/pingcap/tidb/parser/model"
|
|
"github.com/pingcap/tidb/parser/mysql"
|
|
"github.com/pingcap/tidb/parser/terror"
|
|
plannercore "github.com/pingcap/tidb/planner/core"
|
|
"github.com/pingcap/tidb/sessionctx"
|
|
"github.com/pingcap/tidb/sessiontxn"
|
|
"github.com/pingcap/tidb/table"
|
|
"github.com/pingcap/tidb/types"
|
|
"github.com/pingcap/tidb/util/chunk"
|
|
"github.com/pingcap/tidb/util/dbterror/exeerrors"
|
|
"github.com/pingcap/tidb/util/intest"
|
|
"github.com/pingcap/tidb/util/logutil"
|
|
"github.com/pingcap/tidb/util/sqlexec"
|
|
"github.com/pingcap/tidb/util/stringutil"
|
|
"github.com/pingcap/tidb/util/syncutil"
|
|
"go.uber.org/zap"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
var (
|
|
taskQueueSize = 16 // the maximum number of pending tasks to commit in queue
|
|
)
|
|
|
|
// LoadDataExec represents a load data executor.
|
|
type LoadDataExec struct {
|
|
baseExecutor
|
|
|
|
FileLocRef ast.FileLocRefTp
|
|
OnDuplicate ast.OnDuplicateKeyHandlingType
|
|
loadDataWorker *LoadDataWorker
|
|
detachHandled bool
|
|
}
|
|
|
|
// Next implements the Executor Next interface.
|
|
func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
|
|
defer func() {
|
|
// in happy path the detached worker will release the system session in
|
|
// e.loadDataWorker.doLoad, and it must have no error after e.loadDataWorker.doLoad.
|
|
// Here we handle the case that error occurred before e.loadDataWorker.doLoad/
|
|
if err != nil && e.loadDataWorker.controller.Detached {
|
|
ctx2 := context.Background()
|
|
ctx2 = kv.WithInternalSourceType(ctx2, kv.InternalLoadData)
|
|
e.loadDataWorker.releaseSysSession(ctx2, e.loadDataWorker.Ctx)
|
|
}
|
|
}()
|
|
|
|
req.GrowAndReset(e.maxChunkSize)
|
|
if e.detachHandled {
|
|
// need to return an empty req to indicate all results have been written
|
|
return nil
|
|
}
|
|
ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData)
|
|
|
|
switch e.FileLocRef {
|
|
case ast.FileLocServerOrRemote:
|
|
u, err2 := storage.ParseRawURL(e.loadDataWorker.GetInfilePath())
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(err2.Error())
|
|
}
|
|
path := strings.Trim(u.Path, "/")
|
|
u.Path = ""
|
|
b, err2 := storage.ParseBackendFromURL(u, nil)
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs(getMsgFromBRError(err2))
|
|
}
|
|
if b.GetLocal() != nil {
|
|
return exeerrors.ErrLoadDataFromServerDisk.GenWithStackByArgs(e.loadDataWorker.GetInfilePath())
|
|
}
|
|
// try to find pattern error in advance
|
|
_, err2 = filepath.Match(stringutil.EscapeGlobExceptAsterisk(path), "")
|
|
if err2 != nil {
|
|
return exeerrors.ErrLoadDataInvalidURI.GenWithStackByArgs("Glob pattern error: " + err2.Error())
|
|
}
|
|
jobID, err2 := e.loadFromRemote(ctx, b, path)
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
if e.loadDataWorker.controller.Detached {
|
|
req.AppendInt64(0, jobID)
|
|
e.detachHandled = true
|
|
}
|
|
case ast.FileLocClient:
|
|
if e.loadDataWorker.controller.Detached {
|
|
return exeerrors.ErrLoadDataCantDetachWithLocal
|
|
}
|
|
|
|
// let caller use handleQuerySpecial to read data in this connection
|
|
sctx := e.loadDataWorker.ctx
|
|
val := sctx.Value(LoadDataVarKey)
|
|
if val != nil {
|
|
sctx.SetValue(LoadDataVarKey, nil)
|
|
return errors.New("previous load data option wasn't closed normally")
|
|
}
|
|
sctx.SetValue(LoadDataVarKey, e.loadDataWorker)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *LoadDataExec) loadFromRemote(
|
|
ctx context.Context,
|
|
b *backup.StorageBackend,
|
|
path string,
|
|
) (int64, error) {
|
|
opt := &storage.ExternalStorageOptions{}
|
|
if intest.InTest {
|
|
opt.NoCredentials = true
|
|
}
|
|
s, err := storage.New(ctx, b, opt)
|
|
if err != nil {
|
|
return 0, exeerrors.ErrLoadDataCantAccess
|
|
}
|
|
|
|
idx := strings.IndexByte(path, '*')
|
|
// simple path when the INFILE represent one file
|
|
if idx == -1 {
|
|
opener := func(ctx context.Context) (io.ReadSeekCloser, error) {
|
|
fileReader, err2 := s.Open(ctx, path)
|
|
if err2 != nil {
|
|
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err2), "Please check the INFILE path is correct")
|
|
}
|
|
return fileReader, nil
|
|
}
|
|
|
|
// try to read the file size to report progress. Don't fail the main load
|
|
// if this fails to tolerate transient errors.
|
|
filesize := int64(-1)
|
|
reader, err2 := opener(ctx)
|
|
if err2 == nil {
|
|
size, err3 := reader.Seek(0, io.SeekEnd)
|
|
if err3 != nil {
|
|
logutil.Logger(ctx).Warn("failed to read file size by seek in LOAD DATA",
|
|
zap.Error(err3))
|
|
} else {
|
|
filesize = size
|
|
}
|
|
terror.Log(reader.Close())
|
|
}
|
|
|
|
return e.loadDataWorker.Load(ctx, []LoadDataReaderInfo{{
|
|
Opener: opener,
|
|
Remote: &loadRemoteInfo{store: s, path: path, size: filesize},
|
|
}})
|
|
}
|
|
|
|
// when the INFILE represent multiple files
|
|
readerInfos := make([]LoadDataReaderInfo, 0, 8)
|
|
commonPrefix := path[:idx]
|
|
// we only support '*', in order to reuse glob library manually escape the path
|
|
escapedPath := stringutil.EscapeGlobExceptAsterisk(path)
|
|
|
|
err = s.WalkDir(ctx, &storage.WalkOption{ObjPrefix: commonPrefix},
|
|
func(remotePath string, size int64) error {
|
|
// we have checked in LoadDataExec.Next
|
|
//nolint: errcheck
|
|
match, _ := filepath.Match(escapedPath, remotePath)
|
|
if !match {
|
|
return nil
|
|
}
|
|
readerInfos = append(readerInfos, LoadDataReaderInfo{
|
|
Opener: func(ctx context.Context) (io.ReadSeekCloser, error) {
|
|
fileReader, err2 := s.Open(ctx, remotePath)
|
|
if err2 != nil {
|
|
return nil, exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(getMsgFromBRError(err2), "Please check the INFILE path is correct")
|
|
}
|
|
return fileReader, nil
|
|
},
|
|
Remote: &loadRemoteInfo{
|
|
store: s,
|
|
path: remotePath,
|
|
size: size,
|
|
},
|
|
})
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
return e.loadDataWorker.Load(ctx, readerInfos)
|
|
}
|
|
|
|
// Close implements the Executor Close interface.
|
|
func (e *LoadDataExec) Close() error {
|
|
if e.runtimeStats != nil && e.loadDataWorker != nil && e.loadDataWorker.stats != nil {
|
|
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataWorker.stats)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Open implements the Executor Open interface.
|
|
func (e *LoadDataExec) Open(_ context.Context) error {
|
|
if e.loadDataWorker.insertColumns != nil {
|
|
e.loadDataWorker.initEvalBuffer()
|
|
}
|
|
// Init for runtime stats.
|
|
e.loadDataWorker.collectRuntimeStatsEnabled()
|
|
return nil
|
|
}
|
|
|
|
// commitTask is used for passing data from processStream goroutine to commitWork goroutine.
|
|
type commitTask struct {
|
|
cnt uint64
|
|
rows [][]types.Datum
|
|
|
|
loadedRowCnt uint64
|
|
scannedFileSize int64
|
|
}
|
|
|
|
type loadRemoteInfo struct {
|
|
store storage.ExternalStorage
|
|
path string
|
|
size int64
|
|
}
|
|
|
|
// LoadDataWorker does a LOAD DATA job.
|
|
type LoadDataWorker struct {
|
|
*InsertValues
|
|
|
|
// TODO: remove this field, use embedded InsertValues.ctx instead.
|
|
Ctx sessionctx.Context
|
|
// Data interpretation is restrictive if the SQL mode is restrictive and neither
|
|
// the IGNORE nor the LOCAL modifier is specified. Errors terminate the load
|
|
// operation.
|
|
// ref https://dev.mysql.com/doc/refman/8.0/en/load-data.html#load-data-column-assignments
|
|
restrictive bool
|
|
|
|
controller *importer.LoadDataController
|
|
|
|
table table.Table
|
|
row []types.Datum
|
|
rows [][]types.Datum
|
|
commitTaskQueue chan commitTask
|
|
finishedSize int64
|
|
progress atomic.Pointer[asyncloaddata.Progress]
|
|
getSysSessionFn func() (sessionctx.Context, error)
|
|
putSysSessionFn func(context.Context, sessionctx.Context)
|
|
}
|
|
|
|
// NewLoadDataWorker creates a new LoadDataWorker that is ready to work.
|
|
func NewLoadDataWorker(
|
|
userSctx sessionctx.Context,
|
|
plan *plannercore.LoadData,
|
|
tbl table.Table,
|
|
getSysSessionFn func() (sessionctx.Context, error),
|
|
putSysSessionFn func(context.Context, sessionctx.Context),
|
|
) (w *LoadDataWorker, err error) {
|
|
ownSession := false
|
|
defer func() {
|
|
if err != nil && ownSession {
|
|
ctx := context.Background()
|
|
ctx = kv.WithInternalSourceType(ctx, kv.InternalLoadData)
|
|
putSysSessionFn(ctx, userSctx)
|
|
}
|
|
}()
|
|
|
|
controller, err := importer.NewLoadDataController(userSctx, plan, tbl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sctx := userSctx
|
|
if controller.Detached {
|
|
if plan.FileLocRef == ast.FileLocClient {
|
|
return nil, exeerrors.ErrLoadDataCantDetachWithLocal
|
|
}
|
|
sysSession, err2 := getSysSessionFn()
|
|
if err2 != nil {
|
|
return nil, err2
|
|
}
|
|
ownSession = true
|
|
|
|
err = ResetContextOfStmt(sysSession, &ast.LoadDataStmt{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// copy the related variables to the new session
|
|
// I have no confident that all needed variables are copied :(
|
|
from := userSctx.GetSessionVars()
|
|
to := sysSession.GetSessionVars()
|
|
to.User = from.User
|
|
to.CurrentDB = from.CurrentDB
|
|
to.SQLMode = from.SQLMode
|
|
sctx = sysSession
|
|
}
|
|
|
|
insertVal := &InsertValues{
|
|
baseExecutor: newBaseExecutor(sctx, nil, plan.ID()),
|
|
Table: tbl,
|
|
Columns: plan.Columns,
|
|
GenExprs: plan.GenCols.Exprs,
|
|
isLoadData: true,
|
|
txnInUse: syncutil.Mutex{},
|
|
maxRowsInBatch: uint64(controller.GetBatchSize()),
|
|
}
|
|
restrictive := sctx.GetSessionVars().SQLMode.HasStrictMode() &&
|
|
plan.OnDuplicate != ast.OnDuplicateKeyHandlingIgnore
|
|
if !restrictive {
|
|
// TODO: DupKeyAsWarning represents too many "ignore error" paths, the
|
|
// meaning of this flag is not clear. I can only reuse it here.
|
|
sctx.GetSessionVars().StmtCtx.DupKeyAsWarning = true
|
|
sctx.GetSessionVars().StmtCtx.TruncateAsWarning = true
|
|
sctx.GetSessionVars().StmtCtx.BadNullAsWarning = true
|
|
}
|
|
loadDataWorker := &LoadDataWorker{
|
|
row: make([]types.Datum, 0, len(insertVal.insertColumns)),
|
|
commitTaskQueue: make(chan commitTask, taskQueueSize),
|
|
InsertValues: insertVal,
|
|
table: tbl,
|
|
controller: controller,
|
|
Ctx: sctx,
|
|
restrictive: restrictive,
|
|
getSysSessionFn: getSysSessionFn,
|
|
putSysSessionFn: putSysSessionFn,
|
|
}
|
|
if err = loadDataWorker.initInsertValues(); err != nil {
|
|
return nil, err
|
|
}
|
|
loadDataWorker.ResetBatch()
|
|
return loadDataWorker, nil
|
|
}
|
|
|
|
func (e *LoadDataWorker) initInsertValues() error {
|
|
e.insertColumns = e.controller.GetInsertColumns()
|
|
e.rowLen = len(e.insertColumns)
|
|
|
|
for _, col := range e.insertColumns {
|
|
if col.Name.L == model.ExtraHandleName.L {
|
|
if !e.ctx.GetSessionVars().AllowWriteRowID {
|
|
return errors.Errorf("load data statement for _tidb_rowid are not supported")
|
|
}
|
|
e.hasExtraHandle = true
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// LoadDataReaderInfo provides information for a data reader of LOAD DATA.
|
|
type LoadDataReaderInfo struct {
|
|
// Opener can be called at needed to get a io.ReadSeekCloser. It will only
|
|
// be called once.
|
|
Opener func(ctx context.Context) (io.ReadSeekCloser, error)
|
|
// Remote is not nil only if load from cloud storage.
|
|
Remote *loadRemoteInfo
|
|
}
|
|
|
|
// Load reads from readerInfos and do load data job.
|
|
func (e *LoadDataWorker) Load(
|
|
ctx context.Context,
|
|
readerInfos []LoadDataReaderInfo,
|
|
) (int64, error) {
|
|
var (
|
|
jobID int64
|
|
err error
|
|
)
|
|
|
|
s, err := e.getSysSessionFn()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer e.putSysSessionFn(ctx, s)
|
|
|
|
sqlExec := s.(sqlexec.SQLExecutor)
|
|
|
|
jobID, err = asyncloaddata.CreateLoadDataJob(
|
|
ctx,
|
|
sqlExec,
|
|
e.GetInfilePath(),
|
|
e.controller.SchemaName,
|
|
e.table.Meta().Name.O,
|
|
importer.LogicalImportMode,
|
|
e.Ctx.GetSessionVars().User.String(),
|
|
)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if e.controller.Detached {
|
|
go func() {
|
|
detachedCtx := context.Background()
|
|
detachedCtx = kv.WithInternalSourceType(detachedCtx, kv.InternalLoadData)
|
|
// error is stored in system table, so we can ignore it here
|
|
//nolint: errcheck
|
|
_ = e.doLoad(detachedCtx, readerInfos, jobID)
|
|
}()
|
|
return jobID, nil
|
|
}
|
|
return jobID, e.doLoad(ctx, readerInfos, jobID)
|
|
}
|
|
|
|
func (e *LoadDataWorker) doLoad(
|
|
ctx context.Context,
|
|
readerInfos []LoadDataReaderInfo,
|
|
jobID int64,
|
|
) (err error) {
|
|
defer func() {
|
|
if e.controller.Detached {
|
|
e.putSysSessionFn(ctx, e.Ctx)
|
|
}
|
|
}()
|
|
|
|
// get a session for UpdateJobProgress. e.Ctx is exclusively used by commitWork.
|
|
s, err := e.getSysSessionFn()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer e.putSysSessionFn(ctx, s)
|
|
|
|
sqlExec := s.(sqlexec.SQLExecutor)
|
|
|
|
defer func() {
|
|
// write the ending status even if user context is canceled.
|
|
ctx2 := context.Background()
|
|
ctx2 = kv.WithInternalSourceType(ctx2, kv.InternalLoadData)
|
|
if err == nil {
|
|
err2 := asyncloaddata.FinishJob(
|
|
ctx2,
|
|
sqlExec,
|
|
jobID,
|
|
e.Ctx.GetSessionVars().StmtCtx.GetMessage())
|
|
terror.Log(err2)
|
|
return
|
|
}
|
|
errMsg := err.Error()
|
|
if errImpl, ok := errors.Cause(err).(*errors.Error); ok {
|
|
b, marshalErr := errImpl.MarshalJSON()
|
|
if marshalErr == nil {
|
|
errMsg = string(b)
|
|
}
|
|
}
|
|
|
|
err2 := asyncloaddata.FailJob(ctx2, sqlExec, jobID, errMsg)
|
|
terror.Log(err2)
|
|
}()
|
|
|
|
failpoint.Inject("AfterCreateLoadDataJob", nil)
|
|
|
|
progress := asyncloaddata.Progress{
|
|
SourceFileSize: -1,
|
|
LoadedFileSize: 0,
|
|
LoadedRowCnt: 0,
|
|
}
|
|
|
|
totalFilesize := int64(0)
|
|
hasErr := false
|
|
for _, readerInfo := range readerInfos {
|
|
if readerInfo.Remote == nil {
|
|
logutil.Logger(ctx).Warn("can not get total file size when LOAD DATA from local file")
|
|
hasErr = true
|
|
break
|
|
}
|
|
totalFilesize += readerInfo.Remote.size
|
|
}
|
|
if !hasErr {
|
|
progress.SourceFileSize = totalFilesize
|
|
}
|
|
e.progress.Store(&progress)
|
|
|
|
err = asyncloaddata.StartJob(ctx, sqlExec, jobID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
failpoint.Inject("AfterStartJob", nil)
|
|
|
|
group, groupCtx := errgroup.WithContext(ctx)
|
|
// done is used to let commitWork goroutine notify UpdateJobProgress
|
|
// goroutine that the job is finished.
|
|
done := make(chan struct{})
|
|
// both processStream and commitWork goroutines will use this txn.
|
|
err = sessiontxn.NewTxn(ctx, e.Ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// processStream goroutine.
|
|
var parser mydump.Parser
|
|
group.Go(func() error {
|
|
for _, info := range readerInfos {
|
|
reader, err2 := info.Opener(ctx)
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
|
|
parser, err2 = e.buildParser(ctx, reader, info.Remote)
|
|
if err2 != nil {
|
|
terror.Log(reader.Close())
|
|
return err2
|
|
}
|
|
err2 = e.processStream(groupCtx, parser, reader)
|
|
terror.Log(reader.Close())
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
}
|
|
|
|
close(e.commitTaskQueue)
|
|
return nil
|
|
})
|
|
// commitWork goroutine.
|
|
group.Go(func() error {
|
|
failpoint.Inject("BeforeCommitWork", nil)
|
|
err2 := e.commitWork(groupCtx)
|
|
if err2 == nil {
|
|
close(done)
|
|
}
|
|
return err2
|
|
})
|
|
// UpdateJobProgress goroutine.
|
|
group.Go(func() error {
|
|
ticker := time.NewTicker(time.Duration(asyncloaddata.HeartBeatInSec) * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-done:
|
|
// try to update progress to set 100% progress
|
|
p := e.progress.Load()
|
|
ok, err2 := asyncloaddata.UpdateJobProgress(ctx, sqlExec, jobID, p.String())
|
|
if !ok || err2 != nil {
|
|
logutil.Logger(ctx).Warn("failed to update job progress when finished",
|
|
zap.Bool("ok", ok), zap.Error(err2))
|
|
}
|
|
return nil
|
|
case <-groupCtx.Done():
|
|
return nil
|
|
case <-ticker.C:
|
|
p := e.progress.Load()
|
|
ok, err2 := asyncloaddata.UpdateJobProgress(ctx, sqlExec, jobID, p.String())
|
|
if err2 != nil {
|
|
return err2
|
|
}
|
|
if !ok {
|
|
return errors.Errorf("failed to update job progress, the job %d is interrupted by user or failed to keepalive", jobID)
|
|
}
|
|
}
|
|
}
|
|
})
|
|
|
|
err = group.Wait()
|
|
e.SetMessage()
|
|
return err
|
|
}
|
|
|
|
func (e *LoadDataWorker) buildParser(
|
|
ctx context.Context,
|
|
reader io.ReadSeekCloser,
|
|
remote *loadRemoteInfo,
|
|
) (parser mydump.Parser, err error) {
|
|
switch e.controller.Format {
|
|
case importer.LoadDataFormatDelimitedData:
|
|
// CSV-like
|
|
parser, err = mydump.NewCSVParser(
|
|
ctx,
|
|
e.controller.GenerateCSVConfig(),
|
|
reader,
|
|
importer.LoadDataReadBlockSize,
|
|
nil,
|
|
false,
|
|
// TODO: support charset conversion
|
|
nil)
|
|
case importer.LoadDataFormatSQLDump:
|
|
parser = mydump.NewChunkParser(
|
|
ctx,
|
|
e.Ctx.GetSessionVars().SQLMode,
|
|
reader,
|
|
importer.LoadDataReadBlockSize,
|
|
nil,
|
|
)
|
|
case importer.LoadDataFormatParquet:
|
|
if remote == nil {
|
|
return nil, exeerrors.ErrLoadParquetFromLocal
|
|
}
|
|
parser, err = mydump.NewParquetParser(
|
|
ctx,
|
|
remote.store,
|
|
reader,
|
|
remote.path,
|
|
)
|
|
}
|
|
if err != nil {
|
|
return nil, exeerrors.ErrLoadDataWrongFormatConfig.GenWithStack(err.Error())
|
|
}
|
|
parser.SetLogger(log.Logger{Logger: logutil.Logger(ctx)})
|
|
|
|
// handle IGNORE N LINES
|
|
ignoreOneLineFn := parser.ReadRow
|
|
if csvParser, ok := parser.(*mydump.CSVParser); ok {
|
|
ignoreOneLineFn = func() error {
|
|
_, _, err := csvParser.ReadUntilTerminator()
|
|
return err
|
|
}
|
|
}
|
|
|
|
ignoreLineCnt := e.controller.IgnoreLines
|
|
for ignoreLineCnt > 0 {
|
|
err = ignoreOneLineFn()
|
|
if err != nil {
|
|
if errors.Cause(err) == io.EOF {
|
|
return parser, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
ignoreLineCnt--
|
|
}
|
|
return parser, nil
|
|
}
|
|
|
|
// processStream process input stream from parser. When returns nil, it means
|
|
// all data is read.
|
|
func (e *LoadDataWorker) processStream(
|
|
ctx context.Context,
|
|
parser mydump.Parser,
|
|
seeker io.Seeker,
|
|
) (err error) {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logutil.Logger(ctx).Error("process routine panicked",
|
|
zap.Reflect("r", r),
|
|
zap.Stack("stack"))
|
|
err = errors.Errorf("%v", r)
|
|
}
|
|
}()
|
|
|
|
checkKilled := time.NewTicker(30 * time.Second)
|
|
defer checkKilled.Stop()
|
|
|
|
var (
|
|
loggedError = false
|
|
currScannedSize = int64(0)
|
|
)
|
|
for {
|
|
// prepare batch and enqueue task
|
|
if err = e.ReadOneBatchRows(ctx, parser); err != nil {
|
|
return
|
|
}
|
|
if e.curBatchCnt == 0 {
|
|
e.finishedSize += currScannedSize
|
|
return
|
|
}
|
|
|
|
TrySendTask:
|
|
currScannedSize, err = seeker.Seek(0, io.SeekCurrent)
|
|
if err != nil && !loggedError {
|
|
loggedError = true
|
|
logutil.Logger(ctx).Error(" LOAD DATA failed to read current file offset by seek",
|
|
zap.Error(err))
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-checkKilled.C:
|
|
if atomic.CompareAndSwapUint32(&e.Ctx.GetSessionVars().Killed, 1, 0) {
|
|
logutil.Logger(ctx).Info("load data query interrupted quit data processing")
|
|
close(e.commitTaskQueue)
|
|
return exeerrors.ErrQueryInterrupted
|
|
}
|
|
goto TrySendTask
|
|
case e.commitTaskQueue <- commitTask{
|
|
cnt: e.curBatchCnt,
|
|
rows: e.rows,
|
|
loadedRowCnt: e.rowCount,
|
|
scannedFileSize: e.finishedSize + currScannedSize,
|
|
}:
|
|
}
|
|
// reset rows buffer, will reallocate buffer but NOT reuse
|
|
e.ResetBatch()
|
|
}
|
|
}
|
|
|
|
// ResetBatch reset the inner batch.
|
|
func (e *LoadDataWorker) ResetBatch() {
|
|
e.rows = make([][]types.Datum, 0, e.maxRowsInBatch)
|
|
e.curBatchCnt = 0
|
|
}
|
|
|
|
// commitWork commit batch sequentially. When returns nil, it means the job is
|
|
// finished.
|
|
func (e *LoadDataWorker) commitWork(ctx context.Context) (err error) {
|
|
defer func() {
|
|
r := recover()
|
|
if r != nil {
|
|
logutil.Logger(ctx).Error("commitWork panicked",
|
|
zap.Reflect("r", r),
|
|
zap.Stack("stack"))
|
|
err = errors.Errorf("%v", r)
|
|
}
|
|
}()
|
|
|
|
var (
|
|
tasks uint64
|
|
lastScannedFileSize int64
|
|
currScannedFileSize int64
|
|
backgroundCtx = context.Background()
|
|
)
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
e.Ctx.StmtRollback(backgroundCtx, false)
|
|
_ = e.Ctx.RefreshTxnCtx(backgroundCtx)
|
|
return ctx.Err()
|
|
case task, ok := <-e.commitTaskQueue:
|
|
if !ok {
|
|
p := e.progress.Load()
|
|
newP := &asyncloaddata.Progress{
|
|
SourceFileSize: p.SourceFileSize,
|
|
LoadedFileSize: currScannedFileSize,
|
|
LoadedRowCnt: p.LoadedRowCnt,
|
|
}
|
|
e.progress.Store(newP)
|
|
return nil
|
|
}
|
|
start := time.Now()
|
|
if err = e.commitOneTask(ctx, task); err != nil {
|
|
return err
|
|
}
|
|
// we want to report "loaded" progress, not "scanned" progress, the
|
|
// difference of these two is the latest block we read is still in
|
|
// processing. So when "scanned" progress get forward we know the
|
|
// last block is processed.
|
|
// The corner case is when a record is larger than the block size,
|
|
// but that should be rare.
|
|
if task.scannedFileSize != currScannedFileSize {
|
|
lastScannedFileSize = currScannedFileSize
|
|
currScannedFileSize = task.scannedFileSize
|
|
}
|
|
p := e.progress.Load()
|
|
newP := &asyncloaddata.Progress{
|
|
SourceFileSize: p.SourceFileSize,
|
|
LoadedFileSize: lastScannedFileSize,
|
|
LoadedRowCnt: task.loadedRowCnt,
|
|
}
|
|
e.progress.Store(newP)
|
|
tasks++
|
|
logutil.Logger(ctx).Info("commit one task success",
|
|
zap.Duration("commit time usage", time.Since(start)),
|
|
zap.Uint64("keys processed", task.cnt),
|
|
zap.Uint64("tasks processed", tasks),
|
|
zap.Int("tasks in queue", len(e.commitTaskQueue)))
|
|
failpoint.Inject("AfterCommitOneTask", nil)
|
|
}
|
|
}
|
|
}
|
|
|
|
// commitOneTask insert Data from LoadDataWorker.rows, then make commit and refresh txn
|
|
func (e *LoadDataWorker) commitOneTask(ctx context.Context, task commitTask) error {
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
e.Ctx.StmtRollback(ctx, false)
|
|
}
|
|
}()
|
|
err = e.CheckAndInsertOneBatch(ctx, task.rows, task.cnt)
|
|
if err != nil {
|
|
logutil.Logger(ctx).Error("commit error CheckAndInsert", zap.Error(err))
|
|
return err
|
|
}
|
|
failpoint.Inject("commitOneTaskErr", func() error {
|
|
return errors.New("mock commit one task error")
|
|
})
|
|
e.Ctx.StmtCommit(ctx)
|
|
// Make sure process stream routine never use invalid txn
|
|
e.txnInUse.Lock()
|
|
defer e.txnInUse.Unlock()
|
|
// Make sure that there are no retries when committing.
|
|
if err = e.Ctx.RefreshTxnCtx(ctx); err != nil {
|
|
logutil.Logger(ctx).Error("commit error refresh", zap.Error(err))
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CheckAndInsertOneBatch is used to commit one transaction batch fulfilled data
|
|
func (e *LoadDataWorker) CheckAndInsertOneBatch(ctx context.Context, rows [][]types.Datum, cnt uint64) error {
|
|
if e.stats != nil && e.stats.BasicRuntimeStats != nil {
|
|
// Since this method will not call by executor Next,
|
|
// so we need record the basic executor runtime stats by ourselves.
|
|
start := time.Now()
|
|
defer func() {
|
|
e.stats.BasicRuntimeStats.Record(time.Since(start), 0)
|
|
}()
|
|
}
|
|
var err error
|
|
if cnt == 0 {
|
|
return err
|
|
}
|
|
e.ctx.GetSessionVars().StmtCtx.AddRecordRows(cnt)
|
|
|
|
switch e.controller.OnDuplicate {
|
|
case ast.OnDuplicateKeyHandlingReplace:
|
|
return e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, true)
|
|
case ast.OnDuplicateKeyHandlingIgnore:
|
|
return e.batchCheckAndInsert(ctx, rows[0:cnt], e.addRecordLD, false)
|
|
case ast.OnDuplicateKeyHandlingError:
|
|
for i, row := range rows[0:cnt] {
|
|
sizeHintStep := int(e.Ctx.GetSessionVars().ShardAllocateStep)
|
|
if sizeHintStep > 0 && i%sizeHintStep == 0 {
|
|
sizeHint := sizeHintStep
|
|
remain := len(rows[0:cnt]) - i
|
|
if sizeHint > remain {
|
|
sizeHint = remain
|
|
}
|
|
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint)
|
|
} else {
|
|
err = e.addRecord(ctx, row)
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.ctx.GetSessionVars().StmtCtx.AddCopiedRows(1)
|
|
}
|
|
return nil
|
|
default:
|
|
return errors.Errorf("unknown on duplicate key handling: %v", e.controller.OnDuplicate)
|
|
}
|
|
}
|
|
|
|
func (e *LoadDataWorker) addRecordLD(ctx context.Context, row []types.Datum) error {
|
|
if row == nil {
|
|
return nil
|
|
}
|
|
err := e.addRecord(ctx, row)
|
|
if err != nil {
|
|
if e.restrictive {
|
|
return err
|
|
}
|
|
e.handleWarning(err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReadOneBatchRows reads rows from parser. When parser's reader meet EOF, it
|
|
// will return nil. For other errors it will return directly. When the rows
|
|
// batch is full it will also return nil.
|
|
// The result rows are saved in e.rows and update some members, caller can check
|
|
// if curBatchCnt == 0 to know if reached EOF.
|
|
func (e *LoadDataWorker) ReadOneBatchRows(ctx context.Context, parser mydump.Parser) error {
|
|
for {
|
|
if err := parser.ReadRow(); err != nil {
|
|
if errors.Cause(err) == io.EOF {
|
|
return nil
|
|
}
|
|
return exeerrors.ErrLoadDataCantRead.GenWithStackByArgs(
|
|
err.Error(),
|
|
"Only the following formats delimited text file (csv, tsv), parquet, sql are supported. Please provide the valid source file(s)",
|
|
)
|
|
}
|
|
// rowCount will be used in fillRow(), last insert ID will be assigned according to the rowCount = 1.
|
|
// So should add first here.
|
|
e.rowCount++
|
|
r, err := e.parserData2TableData(ctx, parser.LastRow().Row)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
e.rows = append(e.rows, r)
|
|
e.curBatchCnt++
|
|
if e.maxRowsInBatch != 0 && e.rowCount%e.maxRowsInBatch == 0 {
|
|
logutil.Logger(ctx).Info("batch limit hit when inserting rows", zap.Int("maxBatchRows", e.maxChunkSize),
|
|
zap.Uint64("totalRows", e.rowCount))
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// parserData2TableData encodes the data of parser output.
|
|
func (e *LoadDataWorker) parserData2TableData(
|
|
ctx context.Context,
|
|
parserData []types.Datum,
|
|
) ([]types.Datum, error) {
|
|
var errColNumMismatch error
|
|
switch {
|
|
case len(parserData) < e.controller.GetFieldCount():
|
|
errColNumMismatch = exeerrors.ErrWarnTooFewRecords.GenWithStackByArgs(e.rowCount)
|
|
case len(parserData) > e.controller.GetFieldCount():
|
|
errColNumMismatch = exeerrors.ErrWarnTooManyRecords.GenWithStackByArgs(e.rowCount)
|
|
}
|
|
|
|
if errColNumMismatch != nil {
|
|
if e.restrictive {
|
|
return nil, errColNumMismatch
|
|
}
|
|
e.handleWarning(errColNumMismatch)
|
|
}
|
|
|
|
row := make([]types.Datum, 0, len(e.insertColumns))
|
|
sessionVars := e.Ctx.GetSessionVars()
|
|
setVar := func(name string, col *types.Datum) {
|
|
// User variable names are not case-sensitive
|
|
// https://dev.mysql.com/doc/refman/8.0/en/user-variables.html
|
|
name = strings.ToLower(name)
|
|
if col == nil || col.IsNull() {
|
|
sessionVars.UnsetUserVar(name)
|
|
} else {
|
|
sessionVars.SetUserVarVal(name, *col)
|
|
}
|
|
}
|
|
|
|
fieldMappings := e.controller.GetFieldMapping()
|
|
for i := 0; i < len(fieldMappings); i++ {
|
|
if i >= len(parserData) {
|
|
if fieldMappings[i].Column == nil {
|
|
setVar(fieldMappings[i].UserVar.Name, nil)
|
|
continue
|
|
}
|
|
|
|
// If some columns is missing and their type is time and has not null flag, they should be set as current time.
|
|
if types.IsTypeTime(fieldMappings[i].Column.GetType()) && mysql.HasNotNullFlag(fieldMappings[i].Column.GetFlag()) {
|
|
row = append(row, types.NewTimeDatum(types.CurrentTime(fieldMappings[i].Column.GetType())))
|
|
continue
|
|
}
|
|
|
|
row = append(row, types.NewDatum(nil))
|
|
continue
|
|
}
|
|
|
|
if fieldMappings[i].Column == nil {
|
|
setVar(fieldMappings[i].UserVar.Name, &parserData[i])
|
|
continue
|
|
}
|
|
|
|
row = append(row, parserData[i])
|
|
}
|
|
for i := 0; i < len(e.controller.ColumnAssignments); i++ {
|
|
// eval expression of `SET` clause
|
|
d, err := expression.EvalAstExpr(e.Ctx, e.controller.ColumnAssignments[i].Expr)
|
|
if err != nil {
|
|
if e.restrictive {
|
|
return nil, err
|
|
}
|
|
e.handleWarning(err)
|
|
}
|
|
row = append(row, d)
|
|
}
|
|
|
|
// a new row buffer will be allocated in getRow
|
|
newRow, err := e.getRow(ctx, row)
|
|
if err != nil {
|
|
if e.restrictive {
|
|
return nil, err
|
|
}
|
|
e.handleWarning(err)
|
|
logutil.Logger(ctx).Error("failed to get row", zap.Error(err))
|
|
// TODO: should not return nil! caller will panic when lookup index
|
|
return nil, nil
|
|
}
|
|
|
|
return newRow, nil
|
|
}
|
|
|
|
// SetMessage sets info message(ERR_LOAD_INFO) generated by LOAD statement, it is public because of the special way that
|
|
// LOAD statement is handled.
|
|
func (e *LoadDataWorker) SetMessage() {
|
|
stmtCtx := e.ctx.GetSessionVars().StmtCtx
|
|
numRecords := stmtCtx.RecordRows()
|
|
numDeletes := stmtCtx.DeletedRows()
|
|
numSkipped := numRecords - stmtCtx.CopiedRows()
|
|
numWarnings := stmtCtx.WarningCount()
|
|
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrLoadInfo].Raw, numRecords, numDeletes, numSkipped, numWarnings)
|
|
e.ctx.GetSessionVars().StmtCtx.SetMessage(msg)
|
|
}
|
|
|
|
// GetRows getter for rows
|
|
func (e *LoadDataWorker) GetRows() [][]types.Datum {
|
|
return e.rows
|
|
}
|
|
|
|
// GetCurBatchCnt getter for curBatchCnt
|
|
func (e *LoadDataWorker) GetCurBatchCnt() uint64 {
|
|
return e.curBatchCnt
|
|
}
|
|
|
|
// GetInfilePath get infile path.
|
|
func (e *LoadDataWorker) GetInfilePath() string {
|
|
return e.controller.Path
|
|
}
|
|
|
|
// GetController get load data controller.
|
|
// used in unit test.
|
|
func (e *LoadDataWorker) GetController() *importer.LoadDataController {
|
|
return e.controller
|
|
}
|
|
|
|
var _ io.ReadSeekCloser = (*SimpleSeekerOnReadCloser)(nil)
|
|
|
|
// SimpleSeekerOnReadCloser provides Seek(0, SeekCurrent) on ReadCloser.
|
|
type SimpleSeekerOnReadCloser struct {
|
|
r io.ReadCloser
|
|
pos int
|
|
}
|
|
|
|
// NewSimpleSeekerOnReadCloser creates a SimpleSeekerOnReadCloser.
|
|
func NewSimpleSeekerOnReadCloser(r io.ReadCloser) *SimpleSeekerOnReadCloser {
|
|
return &SimpleSeekerOnReadCloser{r: r}
|
|
}
|
|
|
|
// Read implements io.Reader.
|
|
func (s *SimpleSeekerOnReadCloser) Read(p []byte) (n int, err error) {
|
|
n, err = s.r.Read(p)
|
|
s.pos += n
|
|
return
|
|
}
|
|
|
|
// Seek implements io.Seeker.
|
|
func (s *SimpleSeekerOnReadCloser) Seek(offset int64, whence int) (int64, error) {
|
|
// only support get reader's current offset
|
|
if offset == 0 && whence == io.SeekCurrent {
|
|
return int64(s.pos), nil
|
|
}
|
|
return 0, errors.Errorf("unsupported seek on SimpleSeekerOnReadCloser, offset: %d whence: %d", offset, whence)
|
|
}
|
|
|
|
// Close implements io.Closer.
|
|
func (s *SimpleSeekerOnReadCloser) Close() error {
|
|
return s.r.Close()
|
|
}
|
|
|
|
// loadDataVarKeyType is a dummy type to avoid naming collision in context.
|
|
type loadDataVarKeyType int
|
|
|
|
// String defines a Stringer function for debugging and pretty printing.
|
|
func (k loadDataVarKeyType) String() string {
|
|
return "load_data_var"
|
|
}
|
|
|
|
// LoadDataVarKey is a variable key for load data.
|
|
const LoadDataVarKey loadDataVarKeyType = 0
|
|
|
|
var (
|
|
_ Executor = (*LoadDataActionExec)(nil)
|
|
)
|
|
|
|
// LoadDataActionExec executes LoadDataActionStmt.
|
|
type LoadDataActionExec struct {
|
|
baseExecutor
|
|
|
|
tp ast.LoadDataActionTp
|
|
jobID int64
|
|
}
|
|
|
|
// Next implements the Executor Next interface.
|
|
func (e *LoadDataActionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
|
|
sqlExec := e.ctx.(sqlexec.SQLExecutor)
|
|
user := e.ctx.GetSessionVars().User.String()
|
|
|
|
switch e.tp {
|
|
case ast.LoadDataCancel:
|
|
return asyncloaddata.CancelJob(ctx, sqlExec, e.jobID, user)
|
|
case ast.LoadDataDrop:
|
|
return asyncloaddata.DropJob(ctx, sqlExec, e.jobID, user)
|
|
default:
|
|
return errors.Errorf("not implemented LOAD DATA action %v", e.tp)
|
|
}
|
|
}
|