Files
tidb/pkg/executor/internal/exec/executor.go

482 lines
14 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"reflect"
"time"
"github.com/ngaut/pools"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/linter/constructor"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/tracing"
"go.uber.org/atomic"
)
// Executor is the physical implementation of an algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a "Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
NewChunk() *chunk.Chunk
NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk
RuntimeStats() *execdetails.BasicRuntimeStats
HandleSQLKillerSignal() error
RegisterSQLAndPlanInExecForTopSQL()
AllChildren() []Executor
SetAllChildren([]Executor)
Open(context.Context) error
Next(ctx context.Context, req *chunk.Chunk) error
// `Close()` may be called at any time after `Open()` and it may be called with `Next()` at the same time
Close() error
Schema() *expression.Schema
RetFieldTypes() []*types.FieldType
InitCap() int
MaxChunkSize() int
// Detach detaches the current executor from the session context without considering its children.
//
// It has to make sure, no matter whether it returns true or false, both the original executor and the returning executor
// should be able to be used correctly.
Detach() (Executor, bool)
}
var _ Executor = &BaseExecutor{}
// executorChunkAllocator is a helper to implement `Chunk` related methods in `Executor` interface
type executorChunkAllocator struct {
AllocPool chunk.Allocator
retFieldTypes []*types.FieldType
initCap int
maxChunkSize int
}
// newExecutorChunkAllocator creates a new `executorChunkAllocator`
func newExecutorChunkAllocator(vars *variable.SessionVars, retFieldTypes []*types.FieldType) executorChunkAllocator {
return executorChunkAllocator{
AllocPool: vars.GetChunkAllocator(),
initCap: vars.InitChunkSize,
maxChunkSize: vars.MaxChunkSize,
retFieldTypes: retFieldTypes,
}
}
// InitCap returns the initial capacity for chunk
func (e *executorChunkAllocator) InitCap() int {
failpoint.Inject("initCap", func(val failpoint.Value) {
failpoint.Return(val.(int))
})
return e.initCap
}
// SetInitCap sets the initial capacity for chunk
func (e *executorChunkAllocator) SetInitCap(c int) {
e.initCap = c
}
// MaxChunkSize returns the max chunk size.
func (e *executorChunkAllocator) MaxChunkSize() int {
failpoint.Inject("maxChunkSize", func(val failpoint.Value) {
failpoint.Return(val.(int))
})
return e.maxChunkSize
}
// SetMaxChunkSize sets the max chunk size.
func (e *executorChunkAllocator) SetMaxChunkSize(size int) {
e.maxChunkSize = size
}
// NewChunk creates a new chunk according to the executor configuration
func (e *executorChunkAllocator) NewChunk() *chunk.Chunk {
return e.NewChunkWithCapacity(e.retFieldTypes, e.InitCap(), e.MaxChunkSize())
}
// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *executorChunkAllocator) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
return e.AllocPool.Alloc(fields, capacity, maxCachesize)
}
// executorMeta is a helper to store metadata for an execturo and implement the getter
type executorMeta struct {
schema *expression.Schema
children []Executor
retFieldTypes []*types.FieldType
id int
}
// newExecutorMeta creates a new `executorMeta`
func newExecutorMeta(schema *expression.Schema, id int, children ...Executor) executorMeta {
e := executorMeta{
id: id,
schema: schema,
children: children,
}
if schema != nil {
cols := schema.Columns
e.retFieldTypes = make([]*types.FieldType, len(cols))
for i := range cols {
e.retFieldTypes[i] = cols[i].RetType
}
}
return e
}
// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *executorMeta) RetFieldTypes() []*types.FieldType {
return e.retFieldTypes
}
// ID returns the id of an executor.
func (e *executorMeta) ID() int {
return e.id
}
// AllChildren returns all children.
func (e *executorMeta) AllChildren() []Executor {
return e.children
}
// SetAllChildren sets the children for an executor.
func (e *executorMeta) SetAllChildren(children []Executor) {
e.children = children
}
// ChildrenLen returns the length of children.
func (e *executorMeta) ChildrenLen() int {
return len(e.children)
}
// EmptyChildren judges whether the children is empty.
func (e *executorMeta) EmptyChildren() bool {
return len(e.children) == 0
}
// SetChildren sets a child for an executor.
func (e *executorMeta) SetChildren(idx int, ex Executor) {
e.children[idx] = ex
}
// Children returns the children for an executor.
func (e *executorMeta) Children(idx int) Executor {
return e.children[idx]
}
// Schema returns the current BaseExecutor's schema. If it is nil, then create and return a new one.
func (e *executorMeta) Schema() *expression.Schema {
if e.schema == nil {
return expression.NewSchema()
}
return e.schema
}
// GetSchema gets the schema.
func (e *executorMeta) GetSchema() *expression.Schema {
return e.schema
}
// executorStats is a helper to implement the stats related methods for `Executor`
type executorStats struct {
runtimeStats *execdetails.BasicRuntimeStats
isSQLAndPlanRegistered *atomic.Bool
sqlDigest *parser.Digest
planDigest *parser.Digest
normalizedSQL string
normalizedPlan string
inRestrictedSQL bool
}
// newExecutorStats creates a new `executorStats`
func newExecutorStats(stmtCtx *stmtctx.StatementContext, id int) executorStats {
normalizedSQL, sqlDigest := stmtCtx.SQLDigest()
normalizedPlan, planDigest := stmtCtx.GetPlanDigest()
e := executorStats{
isSQLAndPlanRegistered: &stmtCtx.IsSQLAndPlanRegistered,
normalizedSQL: normalizedSQL,
sqlDigest: sqlDigest,
normalizedPlan: normalizedPlan,
planDigest: planDigest,
inRestrictedSQL: stmtCtx.InRestrictedSQL,
}
if stmtCtx.RuntimeStatsColl != nil {
if id > 0 {
e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id, true)
}
}
return e
}
// RuntimeStats returns the runtime stats of an executor.
func (e *executorStats) RuntimeStats() *execdetails.BasicRuntimeStats {
return e.runtimeStats
}
// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql
func (e *executorStats) RegisterSQLAndPlanInExecForTopSQL() {
if topsqlstate.TopSQLEnabled() && e.isSQLAndPlanRegistered.CompareAndSwap(false, true) {
topsql.RegisterSQL(e.normalizedSQL, e.sqlDigest, e.inRestrictedSQL)
if len(e.normalizedPlan) > 0 {
topsql.RegisterPlan(e.normalizedPlan, e.planDigest)
}
}
}
type signalHandler interface {
HandleSignal() error
}
// executorKillerHandler is a helper to implement the killer related methods for `Executor`.
type executorKillerHandler struct {
handler signalHandler
}
func (e *executorKillerHandler) HandleSQLKillerSignal() error {
return e.handler.HandleSignal()
}
func newExecutorKillerHandler(handler signalHandler) executorKillerHandler {
return executorKillerHandler{handler}
}
// BaseExecutorV2 is a simplified version of `BaseExecutor`, which doesn't contain a full session context
type BaseExecutorV2 struct {
_ constructor.Constructor `ctor:"NewBaseExecutorV2,BuildNewBaseExecutorV2"`
executorMeta
executorKillerHandler
executorStats
executorChunkAllocator
}
// NewBaseExecutorV2 creates a new BaseExecutorV2 instance.
func NewBaseExecutorV2(vars *variable.SessionVars, schema *expression.Schema, id int, children ...Executor) BaseExecutorV2 {
executorMeta := newExecutorMeta(schema, id, children...)
e := BaseExecutorV2{
executorMeta: executorMeta,
executorStats: newExecutorStats(vars.StmtCtx, id),
executorChunkAllocator: newExecutorChunkAllocator(vars, executorMeta.RetFieldTypes()),
executorKillerHandler: newExecutorKillerHandler(&vars.SQLKiller),
}
return e
}
// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *BaseExecutorV2) Open(ctx context.Context) error {
for _, child := range e.children {
err := Open(ctx, child)
if err != nil {
return err
}
}
return nil
}
// Close closes all executors and release all resources.
func (e *BaseExecutorV2) Close() error {
var firstErr error
for _, src := range e.children {
if err := Close(src); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
// Next fills multiple rows into a chunk.
func (*BaseExecutorV2) Next(_ context.Context, _ *chunk.Chunk) error {
return nil
}
// Detach detaches the current executor from the session context.
func (*BaseExecutorV2) Detach() (Executor, bool) {
return nil, false
}
// BuildNewBaseExecutorV2 builds a new `BaseExecutorV2` based on the configuration of the current base executor.
// It's used to build a new sub-executor from an existing executor. For example, the `IndexLookUpExecutor` will use
// this function to build `TableReaderExecutor`
func (e *BaseExecutorV2) BuildNewBaseExecutorV2(stmtRuntimeStatsColl *execdetails.RuntimeStatsColl, schema *expression.Schema, id int, children ...Executor) BaseExecutorV2 {
newExecutorMeta := newExecutorMeta(schema, id, children...)
newExecutorStats := e.executorStats
if stmtRuntimeStatsColl != nil {
if id > 0 {
newExecutorStats.runtimeStats = stmtRuntimeStatsColl.GetBasicRuntimeStats(id, true)
}
}
newChunkAllocator := e.executorChunkAllocator
newChunkAllocator.retFieldTypes = newExecutorMeta.RetFieldTypes()
newE := BaseExecutorV2{
executorMeta: newExecutorMeta,
executorStats: newExecutorStats,
executorChunkAllocator: newChunkAllocator,
executorKillerHandler: e.executorKillerHandler,
}
return newE
}
// BaseExecutor holds common information for executors.
type BaseExecutor struct {
_ constructor.Constructor `ctor:"NewBaseExecutor"`
ctx sessionctx.Context
BaseExecutorV2
}
// NewBaseExecutor creates a new BaseExecutor instance.
func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) BaseExecutor {
return BaseExecutor{
ctx: ctx,
BaseExecutorV2: NewBaseExecutorV2(ctx.GetSessionVars(), schema, id, children...),
}
}
// Ctx return ```sessionctx.Context``` of Executor
func (e *BaseExecutor) Ctx() sessionctx.Context {
return e.ctx
}
// UpdateDeltaForTableID updates the delta info for the table with tableID.
func (e *BaseExecutor) UpdateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.UpdateDeltaForTable(id, 0, 0)
}
// GetSysSession gets a system session context from executor.
func (e *BaseExecutor) GetSysSession() (sessionctx.Context, error) {
dom := domain.GetDomain(e.Ctx())
sysSessionPool := dom.SysSessionPool()
ctx, err := sysSessionPool.Get()
if err != nil {
return nil, err
}
restrictedCtx := ctx.(sessionctx.Context)
restrictedCtx.GetSessionVars().InRestrictedSQL = true
return restrictedCtx, nil
}
// ReleaseSysSession releases a system session context to executor.
func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Context) {
if sctx == nil {
return
}
dom := domain.GetDomain(e.Ctx())
sysSessionPool := dom.SysSessionPool()
if _, err := sctx.GetSQLExecutor().ExecuteInternal(ctx, "rollback"); err != nil {
sctx.(pools.Resource).Close()
return
}
sysSessionPool.Put(sctx.(pools.Resource))
}
// TryNewCacheChunk tries to get a cached chunk
func TryNewCacheChunk(e Executor) *chunk.Chunk {
return e.NewChunk()
}
// RetTypes returns all output column types.
func RetTypes(e Executor) []*types.FieldType {
return e.RetFieldTypes()
}
// NewFirstChunk creates a new chunk to buffer current executor's result.
func NewFirstChunk(e Executor) *chunk.Chunk {
return chunk.New(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}
// Open is a wrapper function on e.Open(), it handles some common codes.
func Open(ctx context.Context, e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { e.RuntimeStats().RecordOpen(time.Since(start)) }()
}
return e.Open(ctx)
}
// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, req *chunk.Chunk) (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { e.RuntimeStats().Record(time.Since(start), req.NumRows()) }()
}
if err := e.HandleSQLKillerSignal(); err != nil {
return err
}
r, ctx := tracing.StartRegionEx(ctx, reflect.TypeOf(e).String()+".Next")
defer r.End()
e.RegisterSQLAndPlanInExecForTopSQL()
err = e.Next(ctx, req)
if err != nil {
return err
}
// recheck whether the session/query is killed during the Next()
return e.HandleSQLKillerSignal()
}
// Close is a wrapper function on e.Close(), it handles some common codes.
func Close(e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { e.RuntimeStats().RecordClose(time.Since(start)) }()
}
return e.Close()
}