Files
tidb/pkg/executor/import_into.go

410 lines
13 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"fmt"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config/kerneltype"
"github.com/pingcap/tidb/pkg/dxf/framework/handle"
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
dxfstorage "github.com/pingcap/tidb/pkg/dxf/framework/storage"
"github.com/pingcap/tidb/pkg/dxf/importinto"
"github.com/pingcap/tidb/pkg/executor/importer"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/lightning/backend/encode"
litkv "github.com/pingcap/tidb/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/pkg/lightning/log"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/privilege"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
// ImportIntoExec represents a IMPORT INTO executor.
type ImportIntoExec struct {
exec.BaseExecutor
selectExec exec.Executor
userSctx sessionctx.Context
controller *importer.LoadDataController
stmt string
plan *plannercore.ImportInto
tbl table.Table
dataFilled bool
}
var (
_ exec.Executor = (*ImportIntoExec)(nil)
)
func newImportIntoExec(b exec.BaseExecutor, selectExec exec.Executor, userSctx sessionctx.Context,
plan *plannercore.ImportInto, tbl table.Table) (*ImportIntoExec, error) {
return &ImportIntoExec{
BaseExecutor: b,
selectExec: selectExec,
userSctx: userSctx,
stmt: plan.Stmt,
plan: plan,
tbl: tbl,
}, nil
}
// Next implements the Executor Next interface.
func (e *ImportIntoExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
req.GrowAndReset(e.MaxChunkSize())
ctx = kv.WithInternalSourceType(ctx, kv.InternalImportInto)
if e.dataFilled {
// need to return an empty req to indicate all results have been written
return nil
}
importPlan, err := importer.NewImportPlan(ctx, e.userSctx, e.plan, e.tbl)
if err != nil {
return err
}
astArgs := importer.ASTArgsFromImportPlan(e.plan)
if err = ValidateImportIntoColAssignmentsWithEncodeCtx(importPlan, astArgs.ColumnAssignments); err != nil {
return err
}
controller, err := importer.NewLoadDataController(importPlan, e.tbl, astArgs)
if err != nil {
return err
}
e.controller = controller
if e.selectExec != nil {
// `import from select` doesn't return rows, so no need to set dataFilled.
return e.importFromSelect(ctx)
}
if err2 := e.controller.InitDataFiles(ctx); err2 != nil {
return err2
}
if kerneltype.IsNextGen() {
ksCodec := e.userSctx.GetStore().GetCodec().GetKeyspace()
if err2 := e.controller.CalResourceParams(ctx, ksCodec); err2 != nil {
return err2
}
}
// must use a new session to pre-check, else the stmt in show processlist will be changed.
newSCtx, err2 := CreateSession(e.userSctx)
if err2 != nil {
return err2
}
defer CloseSession(newSCtx)
if err2 = e.controller.CheckRequirements(ctx, newSCtx); err2 != nil {
return err2
}
if err := e.controller.InitTiKVConfigs(ctx, newSCtx); err != nil {
return err
}
failpoint.InjectCall("cancellableCtx", &ctx)
jobID, task, err := e.submitTask(ctx)
if err != nil {
return err
}
if !e.controller.Detached {
if err = e.waitTask(ctx, jobID, task); err != nil {
return err
}
}
return e.fillJobInfo(ctx, jobID, req)
}
// ValidateImportIntoColAssignmentsWithEncodeCtx validates the column assignment expressions should be compatible with the
// encoding context (which maybe different with the context in the current session).
// For example, the function `tidb_is_ddl_owner()` requires the optional eval properties which are not
// provided by the encoding context, so we should avoid using it in the column assignment expressions.
func ValidateImportIntoColAssignmentsWithEncodeCtx(plan *importer.Plan, assigns []*ast.Assignment) error {
encodeCtx, err := litkv.NewSession(&encode.SessionOptions{
SQLMode: plan.SQLMode,
SysVars: plan.ImportantSysVars,
}, log.L())
if err != nil {
return err
}
providedProps := encodeCtx.GetExprCtx().GetEvalCtx().GetOptionalPropSet()
for i, assign := range assigns {
expr, err := expression.BuildSimpleExpr(encodeCtx.GetExprCtx(), assign.Expr)
if err != nil {
return err
}
if err = checkExprWithProvidedProps(i, expr, providedProps); err != nil {
return err
}
}
return nil
}
func checkExprWithProvidedProps(idx int, expr expression.Expression, props expression.OptionalEvalPropKeySet) error {
if e, ok := expr.(*expression.ScalarFunction); ok {
if e.Function.RequiredOptionalEvalProps()|props != props {
return errors.Errorf("FUNCTION %s is not supported in IMPORT INTO column assignment, index %d", e.FuncName.O, idx)
}
for _, arg := range e.GetArgs() {
if err := checkExprWithProvidedProps(idx, arg, props); err != nil {
return err
}
}
}
return nil
}
func (e *ImportIntoExec) fillJobInfo(ctx context.Context, jobID int64, req *chunk.Chunk) error {
e.dataFilled = true
// we use taskManager to get job, user might not have the privilege to system tables.
taskManager, err := dxfstorage.GetTaskManager()
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
if err != nil {
return err
}
var info *importer.JobInfo
if err = taskManager.WithNewSession(func(se sessionctx.Context) error {
sqlExec := se.GetSQLExecutor()
var err2 error
info, err2 = importer.GetJob(ctx, sqlExec, jobID, e.Ctx().GetSessionVars().User.String(), false)
return err2
}); err != nil {
return err
}
FillOneImportJobInfo(req, info, nil)
return nil
}
func (e *ImportIntoExec) submitTask(ctx context.Context) (int64, *proto.TaskBase, error) {
importFromServer, err := objstore.IsLocalPath(e.controller.Path)
if err != nil {
// since we have checked this during creating controller, this should not happen.
return 0, nil, exeerrors.ErrLoadDataInvalidURI.FastGenByArgs(plannercore.ImportIntoDataSource, err.Error())
}
logutil.Logger(ctx).Info("get job importer", zap.Stringer("param", e.controller.Parameters),
zap.Bool("dist-task-enabled", vardef.EnableDistTask.Load()))
if importFromServer {
chunkMap, err2 := e.controller.PopulateChunks(ctx)
if err2 != nil {
return 0, nil, err2
}
return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, chunkMap)
}
// if tidb_enable_dist_task=true, we import distributively, otherwise we import on current node.
if vardef.EnableDistTask.Load() {
return importinto.SubmitTask(ctx, e.controller.Plan, e.stmt)
}
return importinto.SubmitStandaloneTask(ctx, e.controller.Plan, e.stmt, nil)
}
// waitTask waits for the task to finish.
// NOTE: WaitTaskDoneOrPaused also return error when task fails.
func (*ImportIntoExec) waitTask(ctx context.Context, jobID int64, task *proto.TaskBase) error {
err := handle.WaitTaskDoneOrPaused(ctx, task.ID)
// when user KILL the connection, the ctx will be canceled, we need to cancel the import job.
if errors.Cause(err) == context.Canceled {
// use background, since ctx is canceled already.
return cancelAndWaitImportJob(context.Background(), jobID)
}
return err
}
func (e *ImportIntoExec) importFromSelect(ctx context.Context) error {
e.dataFilled = true
// must use a new session as:
// - pre-check will execute other sql, the stmt in show processlist will be changed.
// - userSctx might be in stale read, we cannot do write.
newSCtx, err2 := CreateSession(e.userSctx)
if err2 != nil {
return err2
}
defer CloseSession(newSCtx)
if err2 = e.controller.CheckRequirements(ctx, newSCtx); err2 != nil {
return err2
}
if err := e.controller.InitTiKVConfigs(ctx, newSCtx); err != nil {
return err
}
importID := uuid.New().String()
logutil.Logger(ctx).Info("importing data from select statement",
zap.String("import-id", importID), zap.Int("concurrency", e.controller.ThreadCnt),
zap.String("target-table", e.controller.FullTableName()),
zap.Int64("target-table-id", e.controller.TableInfo.ID))
ti, err2 := importer.NewTableImporter(ctx, e.controller, importID, e.Ctx().GetStore())
if err2 != nil {
return err2
}
defer func() {
if err := ti.Close(); err != nil {
logutil.Logger(ctx).Error("close importer failed", zap.Error(err))
}
}()
selectedChunkCh := make(chan importer.QueryChunk, 1)
ti.SetSelectedChunkCh(selectedChunkCh)
var importedRows int64
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(func() error {
var err error
importedRows, err = ti.ImportSelectedRows(egCtx, newSCtx)
return err
})
eg.Go(func() error {
defer close(selectedChunkCh)
fields := exec.RetTypes(e.selectExec)
var idAllocator int64
chkSize := e.selectExec.InitCap()
maxChkSize := e.selectExec.MaxChunkSize()
for {
// rows will be consumed concurrently, we cannot use chunk pool in session ctx.
chk := chunk.New(e.selectExec.RetFieldTypes(), chkSize, maxChkSize)
err := exec.Next(egCtx, e.selectExec, chk)
if err != nil {
return err
}
if chk.NumRows() == 0 {
break
}
select {
case selectedChunkCh <- importer.QueryChunk{
Fields: fields,
Chk: chk,
RowIDOffset: idAllocator,
}:
idAllocator += int64(chk.NumRows())
case <-egCtx.Done():
return egCtx.Err()
}
if chkSize < maxChkSize {
chkSize = chkSize * 2
chkSize = min(chkSize, maxChkSize)
}
}
return nil
})
if err := eg.Wait(); err != nil {
return err
}
if err2 = importer.FlushTableStats(ctx, newSCtx, e.controller.TableInfo.ID, importedRows); err2 != nil {
logutil.Logger(ctx).Error("flush stats failed", zap.Error(err2))
}
stmtCtx := e.userSctx.GetSessionVars().StmtCtx
stmtCtx.SetAffectedRows(uint64(importedRows))
// TODO: change it after spec is ready.
stmtCtx.SetMessage(fmt.Sprintf("Records: %d, ID: %s", importedRows, importID))
return nil
}
// Close implements the Executor interface.
func (e *ImportIntoExec) Close() error {
if e.controller != nil {
e.controller.Close()
}
return e.BaseExecutor.Close()
}
// ImportIntoActionExec represents a import into action executor.
type ImportIntoActionExec struct {
exec.BaseExecutor
tp ast.ImportIntoActionTp
jobID int64
}
var (
_ exec.Executor = (*ImportIntoActionExec)(nil)
)
// Next implements the Executor Next interface.
func (e *ImportIntoActionExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) {
ctx = kv.WithInternalSourceType(ctx, kv.InternalImportInto)
var hasSuperPriv bool
if pm := privilege.GetPrivilegeManager(e.Ctx()); pm != nil {
hasSuperPriv = pm.RequestVerification(e.Ctx().GetSessionVars().ActiveRoles, "", "", "", mysql.SuperPriv)
}
// we use sessionCtx from GetTaskManager, user ctx might not have enough privileges.
taskManager, err := dxfstorage.GetTaskManager()
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
if err != nil {
return err
}
if err = e.checkPrivilegeAndStatus(ctx, taskManager, hasSuperPriv); err != nil {
return err
}
task := log.BeginTask(logutil.Logger(ctx).With(zap.Int64("jobID", e.jobID),
zap.Any("action", e.tp)), "import into action")
defer func() {
task.End(zap.ErrorLevel, err)
}()
return cancelAndWaitImportJob(ctx, e.jobID)
}
func (e *ImportIntoActionExec) checkPrivilegeAndStatus(ctx context.Context, manager *dxfstorage.TaskManager, hasSuperPriv bool) error {
var info *importer.JobInfo
if err := manager.WithNewSession(func(se sessionctx.Context) error {
exec := se.GetSQLExecutor()
var err2 error
info, err2 = importer.GetJob(ctx, exec, e.jobID, e.Ctx().GetSessionVars().User.String(), hasSuperPriv)
return err2
}); err != nil {
return err
}
if !info.CanCancel() {
return exeerrors.ErrLoadDataInvalidOperation.FastGenByArgs("CANCEL")
}
return nil
}
func cancelAndWaitImportJob(ctx context.Context, jobID int64) error {
manager, err := dxfstorage.GetDXFSvcTaskMgr()
if err != nil {
return err
}
if err := manager.WithNewTxn(ctx, func(se sessionctx.Context) error {
ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask)
return manager.CancelTaskByKeySession(ctx, se, importinto.TaskKey(jobID))
}); err != nil {
return err
}
return handle.WaitTaskDoneByKey(ctx, importinto.TaskKey(jobID))
}