Files
tidb/pkg/executor/internal/exec/executor.go

339 lines
10 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package exec
import (
"context"
"fmt"
"time"
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/tracing"
)
// Executor is the physical implementation of an algebra operator.
//
// In TiDB, all algebra operators are implemented as iterators, i.e., they
// support a simple Open-Next-Close protocol. See this paper for more details:
//
// "Volcano-An Extensible and Parallel Query Evaluation System"
//
// Different from Volcano's execution model, a "Next" function call in TiDB will
// return a batch of rows, other than a single row in Volcano.
// NOTE: Executors must call "chk.Reset()" before appending their results to it.
type Executor interface {
NewChunk() *chunk.Chunk
NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk
RuntimeStats() *execdetails.BasicRuntimeStats
HandleSQLKillerSignal() error
RegisterSQLAndPlanInExecForTopSQL()
AllChildren() []Executor
Open(context.Context) error
Next(ctx context.Context, req *chunk.Chunk) error
Close() error
Schema() *expression.Schema
RetFieldTypes() []*types.FieldType
InitCap() int
MaxChunkSize() int
}
var _ Executor = &BaseExecutor{}
// BaseExecutor holds common information for executors.
type BaseExecutor struct {
ctx sessionctx.Context
AllocPool chunk.Allocator
schema *expression.Schema // output schema
runtimeStats *execdetails.BasicRuntimeStats
children []Executor
retFieldTypes []*types.FieldType
id int
initCap int
maxChunkSize int
}
// NewBaseExecutor creates a new BaseExecutor instance.
func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) BaseExecutor {
e := BaseExecutor{
children: children,
ctx: ctx,
id: id,
schema: schema,
initCap: ctx.GetSessionVars().InitChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
AllocPool: ctx.GetSessionVars().GetChunkAllocator(),
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id > 0 {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
}
}
if schema != nil {
cols := schema.Columns
e.retFieldTypes = make([]*types.FieldType, len(cols))
for i := range cols {
e.retFieldTypes[i] = cols[i].RetType
}
}
return e
}
// RuntimeStats returns the runtime stats of an executor.
func (e *BaseExecutor) RuntimeStats() *execdetails.BasicRuntimeStats {
return e.runtimeStats
}
// ID returns the id of an executor.
func (e *BaseExecutor) ID() int {
return e.id
}
// AllChildren returns all children.
func (e *BaseExecutor) AllChildren() []Executor {
return e.children
}
// ChildrenLen returns the length of children.
func (e *BaseExecutor) ChildrenLen() int {
return len(e.children)
}
// EmptyChildren judges whether the children is empty.
func (e *BaseExecutor) EmptyChildren() bool {
return len(e.children) == 0
}
// SetChildren sets the children for an executor.
func (e *BaseExecutor) SetChildren(idx int, ex Executor) {
e.children[idx] = ex
}
// Children returns the children for an executor.
func (e *BaseExecutor) Children(idx int) Executor {
return e.children[idx]
}
// RetFieldTypes returns the return field types of an executor.
func (e *BaseExecutor) RetFieldTypes() []*types.FieldType {
return e.retFieldTypes
}
// InitCap returns the initial capacity for chunk
func (e *BaseExecutor) InitCap() int {
return e.initCap
}
// SetInitCap sets the initial capacity for chunk
func (e *BaseExecutor) SetInitCap(c int) {
e.initCap = c
}
// MaxChunkSize returns the max chunk size.
func (e *BaseExecutor) MaxChunkSize() int {
return e.maxChunkSize
}
// SetMaxChunkSize sets the max chunk size.
func (e *BaseExecutor) SetMaxChunkSize(size int) {
e.maxChunkSize = size
}
// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *BaseExecutor) Open(ctx context.Context) error {
for _, child := range e.children {
err := Open(ctx, child)
if err != nil {
return err
}
}
return nil
}
// Close closes all executors and release all resources.
func (e *BaseExecutor) Close() error {
var firstErr error
for _, src := range e.children {
if err := Close(src); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
// Schema returns the current BaseExecutor's schema. If it is nil, then create and return a new one.
func (e *BaseExecutor) Schema() *expression.Schema {
if e.schema == nil {
return expression.NewSchema()
}
return e.schema
}
// Next fills multiple rows into a chunk.
func (*BaseExecutor) Next(_ context.Context, _ *chunk.Chunk) error {
return nil
}
// Ctx return ```sessionctx.Context``` of Executor
func (e *BaseExecutor) Ctx() sessionctx.Context {
return e.ctx
}
// GetSchema gets the schema.
func (e *BaseExecutor) GetSchema() *expression.Schema {
return e.schema
}
// UpdateDeltaForTableID updates the delta info for the table with tableID.
func (e *BaseExecutor) UpdateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
txnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{})
}
// GetSysSession gets a system session context from executor.
func (e *BaseExecutor) GetSysSession() (sessionctx.Context, error) {
dom := domain.GetDomain(e.Ctx())
sysSessionPool := dom.SysSessionPool()
ctx, err := sysSessionPool.Get()
if err != nil {
return nil, err
}
restrictedCtx := ctx.(sessionctx.Context)
restrictedCtx.GetSessionVars().InRestrictedSQL = true
return restrictedCtx, nil
}
// ReleaseSysSession releases a system session context to executor.
func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Context) {
if sctx == nil {
return
}
dom := domain.GetDomain(e.Ctx())
sysSessionPool := dom.SysSessionPool()
if _, err := sctx.(sqlexec.SQLExecutor).ExecuteInternal(ctx, "rollback"); err != nil {
sctx.(pools.Resource).Close()
return
}
sysSessionPool.Put(sctx.(pools.Resource))
}
// NewChunk creates a new chunk according to the executor configuration
func (e *BaseExecutor) NewChunk() *chunk.Chunk {
return e.NewChunkWithCapacity(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}
// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *BaseExecutor) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
return e.AllocPool.Alloc(fields, capacity, maxCachesize)
}
// HandleSQLKillerSignal handles the signal sent by SQLKiller
func (e *BaseExecutor) HandleSQLKillerSignal() error {
return e.ctx.GetSessionVars().SQLKiller.HandleSignal()
}
// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql
// TODO: consider whether it's appropriate to have this on executor
func (e *BaseExecutor) RegisterSQLAndPlanInExecForTopSQL() {
sessVars := e.ctx.GetSessionVars()
if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
RegisterSQLAndPlanInExecForTopSQL(sessVars)
}
}
// TryNewCacheChunk tries to get a cached chunk
func TryNewCacheChunk(e Executor) *chunk.Chunk {
return e.NewChunk()
}
// RetTypes returns all output column types.
func RetTypes(e Executor) []*types.FieldType {
return e.RetFieldTypes()
}
// NewFirstChunk creates a new chunk to buffer current executor's result.
func NewFirstChunk(e Executor) *chunk.Chunk {
return chunk.New(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}
// Open is a wrapper function on e.Open(), it handles some common codes.
func Open(ctx context.Context, e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()
return e.Open(ctx)
}
// Next is a wrapper function on e.Next(), it handles some common codes.
func Next(ctx context.Context, e Executor, req *chunk.Chunk) error {
if e.RuntimeStats() != nil {
start := time.Now()
defer func() { e.RuntimeStats().Record(time.Since(start), req.NumRows()) }()
}
if err := e.HandleSQLKillerSignal(); err != nil {
return err
}
r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e))
defer r.End()
e.RegisterSQLAndPlanInExecForTopSQL()
err := e.Next(ctx, req)
if err != nil {
return err
}
// recheck whether the session/query is killed during the Next()
return e.HandleSQLKillerSignal()
}
// Close is a wrapper function on e.Close(), it handles some common codes.
func Close(e Executor) (err error) {
defer func() {
if r := recover(); r != nil {
err = util.GetRecoverError(r)
}
}()
return e.Close()
}
// RegisterSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution.
// This uses to catch the running SQL when Top SQL is enabled in execution.
func RegisterSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) {
stmtCtx := sessVars.StmtCtx
normalizedSQL, sqlDigest := stmtCtx.SQLDigest()
topsql.RegisterSQL(normalizedSQL, sqlDigest, sessVars.InRestrictedSQL)
normalizedPlan, planDigest := stmtCtx.GetPlanDigest()
if len(normalizedPlan) > 0 {
topsql.RegisterPlan(normalizedPlan, planDigest)
}
}