286 lines
11 KiB
Go
286 lines
11 KiB
Go
// Copyright 2015 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package sqlexec
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/pingcap/tidb/pkg/parser"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/terror"
|
|
"github.com/pingcap/tidb/pkg/planner/core/resolve"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// RestrictedSQLExecutor is an interface provides executing restricted sql statement.
|
|
// Why we need this interface?
|
|
// When we execute some management statements, we need to operate system tables.
|
|
// For example when executing create user statement, we need to check if the user already
|
|
// exists in the mysql.User table and insert a new row if not exists. In this case, we need
|
|
// a convenience way to manipulate system tables. The most simple way is executing sql statement.
|
|
// In order to execute sql statement in stmts package, we add this interface to solve dependence problem.
|
|
// And in the same time, we do not want this interface becomes a general way to run sql statement.
|
|
// We hope this could be used with some restrictions such as only allowing system tables as target,
|
|
// do not allowing recursion call.
|
|
// This is implemented in session.go.
|
|
type RestrictedSQLExecutor interface {
|
|
// ParseWithParams is the parameterized version of Parse: it will try to prevent injection under utf8mb4.
|
|
// It works like printf() in c, there are following format specifiers:
|
|
// 1. %?: automatic conversion by the type of arguments. E.g. []string -> ('s1','s2'..)
|
|
// 2. %%: output %
|
|
// 3. %n: for identifiers, for example ("use %n", db)
|
|
//
|
|
// Attention: it does not prevent you from doing parse("select '%?", ";SQL injection!;") => "select '';SQL injection!;'".
|
|
// One argument should be a standalone entity. It should not "concat" with other placeholders and characters.
|
|
// This function only saves you from processing potentially unsafe parameters.
|
|
ParseWithParams(ctx context.Context, sql string, args ...any) (ast.StmtNode, error)
|
|
// ExecRestrictedStmt run sql statement in ctx with some restrictions.
|
|
ExecRestrictedStmt(ctx context.Context, stmt ast.StmtNode, opts ...OptionFuncAlias) ([]chunk.Row, []*resolve.ResultField, error)
|
|
// ExecRestrictedSQL run sql string in ctx with internal session.
|
|
ExecRestrictedSQL(ctx context.Context, opts []OptionFuncAlias, sql string, args ...any) ([]chunk.Row, []*resolve.ResultField, error)
|
|
}
|
|
|
|
// ExecOption is a struct defined for ExecRestrictedStmt/SQL option.
|
|
type ExecOption struct {
|
|
AnalyzeSnapshot *bool
|
|
TrackSysProc func(id uint64, ctx sysproctrack.TrackProc) error
|
|
UnTrackSysProc func(id uint64)
|
|
PartitionPruneMode string
|
|
SnapshotTS uint64
|
|
AnalyzeVer int
|
|
TrackSysProcID uint64
|
|
IgnoreWarning bool
|
|
UseCurSession bool
|
|
EnableDDLAnalyze bool
|
|
}
|
|
|
|
// OptionFuncAlias is defined for the optional parameter of ExecRestrictedStmt/SQL.
|
|
type OptionFuncAlias = func(option *ExecOption)
|
|
|
|
// ExecOptionIgnoreWarning tells ExecRestrictedStmt/SQL to ignore the warnings.
|
|
var ExecOptionIgnoreWarning = func(option *ExecOption) {
|
|
option.IgnoreWarning = true
|
|
}
|
|
|
|
// ExecOptionEnableDDLAnalyze tells ExecRestrictedStmt/SQL analyze to include reorg state index.
|
|
var ExecOptionEnableDDLAnalyze = func(option *ExecOption) {
|
|
option.EnableDDLAnalyze = true
|
|
}
|
|
|
|
// ExecOptionAnalyzeVer1 tells ExecRestrictedStmt/SQL to collect statistics with version1.
|
|
var ExecOptionAnalyzeVer1 = func(option *ExecOption) {
|
|
option.AnalyzeVer = 1
|
|
}
|
|
|
|
// ExecOptionAnalyzeVer2 tells ExecRestrictedStmt/SQL to collect statistics with version2.
|
|
var ExecOptionAnalyzeVer2 = func(option *ExecOption) {
|
|
option.AnalyzeVer = 2
|
|
}
|
|
|
|
// GetPartitionPruneModeOption returns a function which tells ExecRestrictedStmt/SQL to run with pruneMode.
|
|
func GetPartitionPruneModeOption(pruneMode string) OptionFuncAlias {
|
|
return func(option *ExecOption) {
|
|
option.PartitionPruneMode = pruneMode
|
|
}
|
|
}
|
|
|
|
// GetAnalyzeSnapshotOption returns a function which tells ExecRestrictedStmt/SQL to run with analyzeSnapshot.
|
|
func GetAnalyzeSnapshotOption(analyzeSnapshot bool) OptionFuncAlias {
|
|
return func(option *ExecOption) {
|
|
option.AnalyzeSnapshot = new(bool)
|
|
*option.AnalyzeSnapshot = analyzeSnapshot
|
|
}
|
|
}
|
|
|
|
// ExecOptionUseCurSession tells ExecRestrictedStmt/SQL to use current session.
|
|
var ExecOptionUseCurSession = func(option *ExecOption) {
|
|
option.UseCurSession = true
|
|
}
|
|
|
|
// ExecOptionUseSessionPool tells ExecRestrictedStmt/SQL to use session pool.
|
|
// UseCurSession is false by default, sometimes we set it explicitly for readability
|
|
var ExecOptionUseSessionPool = func(option *ExecOption) {
|
|
option.UseCurSession = false
|
|
}
|
|
|
|
// ExecOptionWithSnapshot tells ExecRestrictedStmt/SQL to use a snapshot.
|
|
func ExecOptionWithSnapshot(snapshot uint64) OptionFuncAlias {
|
|
return func(option *ExecOption) {
|
|
option.SnapshotTS = snapshot
|
|
}
|
|
}
|
|
|
|
// ExecOptionWithSysProcTrack tells ExecRestrictedStmt/SQL to track sys process.
|
|
func ExecOptionWithSysProcTrack(procID uint64, track func(id uint64, ctx sysproctrack.TrackProc) error, untrack func(id uint64)) OptionFuncAlias {
|
|
return func(option *ExecOption) {
|
|
option.TrackSysProcID = procID
|
|
option.TrackSysProc = track
|
|
option.UnTrackSysProc = untrack
|
|
}
|
|
}
|
|
|
|
// GetExecOption applies OptionFuncs and return ExecOption
|
|
func GetExecOption(opts []OptionFuncAlias) ExecOption {
|
|
var execOption ExecOption
|
|
for _, opt := range opts {
|
|
opt(&execOption)
|
|
}
|
|
return execOption
|
|
}
|
|
|
|
// SQLExecutor is an interface provides executing normal sql statement.
|
|
// Why we need this interface? To break circle dependence of packages.
|
|
// For example, privilege/privileges package need execute SQL, if it use
|
|
// session.Session.Execute, then privilege/privileges and tidb would become a circle.
|
|
type SQLExecutor interface {
|
|
// Execute is only used by plugins. It can be removed soon.
|
|
Execute(ctx context.Context, sql string) ([]RecordSet, error)
|
|
// ExecuteInternal means execute sql as the internal sql.
|
|
ExecuteInternal(ctx context.Context, sql string, args ...any) (RecordSet, error)
|
|
ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (RecordSet, error)
|
|
}
|
|
|
|
// SQLParser is an interface provides parsing sql statement.
|
|
// To parse a sql statement, we could run parser.New() to get a parser object, and then run Parse method on it.
|
|
// But a session already has a parser bind in it, so we define this interface and use session as its implementation,
|
|
// thus avoid allocating new parser. See session.SQLParser for more information.
|
|
type SQLParser interface {
|
|
ParseSQL(ctx context.Context, sql string, params ...parser.ParseParam) ([]ast.StmtNode, []error, error)
|
|
}
|
|
|
|
// Statement is an interface for SQL execution.
|
|
// NOTE: all Statement implementations must be safe for
|
|
// concurrent using by multiple goroutines.
|
|
// If the Exec method requires any Execution domain local data,
|
|
// they must be held out of the implementing instance.
|
|
type Statement interface {
|
|
// OriginText gets the origin SQL text.
|
|
OriginText() string
|
|
|
|
// Text gets the utf8 encoded SQL text.
|
|
Text() string
|
|
|
|
// GetTextToLog gets the desensitization SQL text for logging.
|
|
GetTextToLog(keepHint bool) string
|
|
|
|
// Exec executes SQL and gets a Recordset.
|
|
Exec(ctx context.Context) (RecordSet, error)
|
|
|
|
// IsPrepared returns whether this statement is prepared statement.
|
|
IsPrepared() bool
|
|
|
|
// IsReadOnly returns if the statement is read only. For example: SelectStmt without lock.
|
|
IsReadOnly(vars *variable.SessionVars) bool
|
|
|
|
// RebuildPlan rebuilds the plan of the statement.
|
|
RebuildPlan(ctx context.Context) (schemaVersion int64, err error)
|
|
|
|
// GetStmtNode returns the stmtNode inside Statement
|
|
GetStmtNode() ast.StmtNode
|
|
}
|
|
|
|
// RecordSet is an abstract result set interface to help get data from Plan.
|
|
type RecordSet interface {
|
|
// Fields gets result fields.
|
|
Fields() []*resolve.ResultField
|
|
|
|
// Next reads records into chunk.
|
|
Next(ctx context.Context, req *chunk.Chunk) error
|
|
|
|
// NewChunk create a chunk, if allocator is nil, the default one is used.
|
|
NewChunk(chunk.Allocator) *chunk.Chunk
|
|
|
|
// Close closes the underlying iterator, call Next after Close will
|
|
// restart the iteration.
|
|
Close() error
|
|
}
|
|
|
|
// DetachableRecordSet extends the `RecordSet` to support detaching from current session context
|
|
type DetachableRecordSet interface {
|
|
RecordSet
|
|
|
|
// TryDetach detaches the record set from the current session context.
|
|
//
|
|
// The last two return value indicates whether the record set is suitable for detaching, and whether
|
|
// it detaches successfully. If it faces any error during detaching (and there is no way to rollback),
|
|
// it will return the error. If an error is returned, the record set (and session) will be left at
|
|
// an unknown state.
|
|
//
|
|
// If the caller receives `_, false, _`, it means the original record set can still be used. If the caller
|
|
// receives an error, it means the original record set (and the session) is dirty.
|
|
TryDetach() (RecordSet, bool, error)
|
|
}
|
|
|
|
// MultiQueryNoDelayResult is an interface for one no-delay result for one statement in multi-queries.
|
|
type MultiQueryNoDelayResult interface {
|
|
// AffectedRows return affected row for one statement in multi-queries.
|
|
AffectedRows() uint64
|
|
// LastMessage return last message for one statement in multi-queries.
|
|
LastMessage() string
|
|
// WarnCount return warn count for one statement in multi-queries.
|
|
WarnCount() uint16
|
|
// Status return status when executing one statement in multi-queries.
|
|
Status() uint16
|
|
// LastInsertID return last insert id for one statement in multi-queries.
|
|
LastInsertID() uint64
|
|
}
|
|
|
|
// DrainRecordSet fetches the rows in the RecordSet.
|
|
func DrainRecordSet(ctx context.Context, rs RecordSet, maxChunkSize int) ([]chunk.Row, error) {
|
|
var rows []chunk.Row
|
|
req := rs.NewChunk(nil)
|
|
for {
|
|
err := rs.Next(ctx, req)
|
|
if err != nil || req.NumRows() == 0 {
|
|
return rows, err
|
|
}
|
|
iter := chunk.NewIterator4Chunk(req)
|
|
for r := iter.Begin(); r != iter.End(); r = iter.Next() {
|
|
rows = append(rows, r)
|
|
}
|
|
req = chunk.Renew(req, maxChunkSize)
|
|
}
|
|
}
|
|
|
|
// DrainRecordSetAndClose fetches the rows in the RecordSet and closes it.
|
|
func DrainRecordSetAndClose(ctx context.Context, rs RecordSet, maxChunkSize int) ([]chunk.Row, error) {
|
|
defer func() {
|
|
if closeErr := rs.Close(); closeErr != nil {
|
|
// Log the close error but don't override the main error
|
|
logutil.BgLogger().Error("failed to close recordSet in DrainRecordSetAndClose", zap.Error(closeErr))
|
|
}
|
|
}()
|
|
|
|
return DrainRecordSet(ctx, rs, maxChunkSize)
|
|
}
|
|
|
|
// ExecSQL executes the sql and returns the result.
|
|
// TODO: consider retry.
|
|
func ExecSQL(ctx context.Context, exec SQLExecutor, sql string, args ...any) ([]chunk.Row, error) {
|
|
rs, err := exec.ExecuteInternal(ctx, sql, args...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if rs != nil {
|
|
defer terror.Call(rs.Close)
|
|
return DrainRecordSet(ctx, rs, 1024)
|
|
}
|
|
return nil, nil
|
|
}
|