executor: refactor the BaseExecutor and remove session context in ProjectionExec (#54614)

close pingcap/tidb#54613
This commit is contained in:
YangKeao
2024-07-17 12:10:29 +08:00
committed by GitHub
parent e1626a9c5b
commit b5bbbf471c
4 changed files with 46 additions and 24 deletions

View File

@ -1803,9 +1803,10 @@ func benchmarkLimitExec(b *testing.B, cas *testutil.LimitCase) {
}
}
proj := &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(cas.Ctx, expression.NewSchema(usedCols...), 0, limit),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
projectionExecutorContext: newProjectionExecutorContext(cas.Ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(cas.Ctx.GetSessionVars(), expression.NewSchema(usedCols...), 0, limit),
numWorkers: 1,
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
exe = proj
}

View File

@ -1975,10 +1975,11 @@ func (b *executorBuilder) buildProjection(v *plannercore.PhysicalProjection) exe
return nil
}
e := &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID(), childExec),
numWorkers: int64(b.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
projectionExecutorContext: newProjectionExecutorContext(b.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
numWorkers: int64(b.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
}
// If the calculation row count for this Projection operator is smaller
@ -4732,10 +4733,11 @@ func (builder *dataReaderBuilder) buildProjectionForIndexJoin(
}()
e := &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(builder.ctx, v.Schema(), v.ID(), childExec),
numWorkers: int64(builder.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
projectionExecutorContext: newProjectionExecutorContext(builder.ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(builder.ctx.GetSessionVars(), v.Schema(), v.ID(), childExec),
numWorkers: int64(builder.ctx.GetSessionVars().ProjectionConcurrency()),
evaluatorSuit: expression.NewEvaluatorSuite(v.Exprs, v.AvoidColumnEvaluator),
calculateNoDelay: v.CalculateNoDelay,
}
// If the calculation row count for this Projection operator is smaller

View File

@ -600,9 +600,10 @@ func TestProjectionParallelRequiredRows(t *testing.T) {
func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression, src exec.Executor, numWorkers int) exec.Executor {
return &ProjectionExec{
BaseExecutor: exec.NewBaseExecutor(ctx, src.Schema(), 0, src),
numWorkers: int64(numWorkers),
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
projectionExecutorContext: newProjectionExecutorContext(ctx),
BaseExecutorV2: exec.NewBaseExecutorV2(ctx.GetSessionVars(), src.Schema(), 0, src),
numWorkers: int64(numWorkers),
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
}

View File

@ -55,10 +55,28 @@ type projectionOutput struct {
done chan error
}
// projectionExecutorContext is the execution context for the `ProjectionExec`
type projectionExecutorContext struct {
stmtMemTracker *memory.Tracker
stmtRuntimeStatsColl *execdetails.RuntimeStatsColl
evalCtx expression.EvalContext
enableVectorizedExpression bool
}
func newProjectionExecutorContext(sctx sessionctx.Context) projectionExecutorContext {
return projectionExecutorContext{
stmtMemTracker: sctx.GetSessionVars().StmtCtx.MemTracker,
stmtRuntimeStatsColl: sctx.GetSessionVars().StmtCtx.RuntimeStatsColl,
evalCtx: sctx.GetExprCtx().GetEvalCtx(),
enableVectorizedExpression: sctx.GetSessionVars().EnableVectorizedExpression,
}
}
// ProjectionExec implements the physical Projection Operator:
// https://en.wikipedia.org/wiki/Projection_(relational_algebra)
type ProjectionExec struct {
exec.BaseExecutor
projectionExecutorContext
exec.BaseExecutorV2
evaluatorSuit *expression.EvaluatorSuite
@ -85,7 +103,7 @@ type ProjectionExec struct {
// Open implements the Executor Open interface.
func (e *ProjectionExec) Open(ctx context.Context) error {
if err := e.BaseExecutor.Open(ctx); err != nil {
if err := e.BaseExecutorV2.Open(ctx); err != nil {
return err
}
failpoint.Inject("mockProjectionExecBaseExecutorOpenReturnedError", func(val failpoint.Value) {
@ -105,7 +123,7 @@ func (e *ProjectionExec) open(_ context.Context) error {
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.memTracker.AttachTo(e.stmtMemTracker)
// For now a Projection can not be executed vectorially only because it
// contains "SetVar" or "GetVar" functions, in this scenario this
@ -204,7 +222,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
if e.childResult.NumRows() == 0 {
return nil
}
err = e.evaluatorSuit.Run(e.Ctx().GetExprCtx().GetEvalCtx(), e.Ctx().GetSessionVars().EnableVectorizedExpression, e.childResult, chk)
err = e.evaluatorSuit.Run(e.evalCtx, e.enableVectorizedExpression, e.childResult, chk)
return err
}
@ -251,7 +269,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) {
for i := int64(0); i < e.numWorkers; i++ {
e.workers = append(e.workers, &projectionWorker{
proj: e,
sctx: e.Ctx(),
ctx: e.projectionExecutorContext,
evaluatorSuit: e.evaluatorSuit,
globalFinishCh: e.finishCh,
inputGiveBackCh: e.fetcher.inputCh,
@ -324,16 +342,16 @@ func (e *ProjectionExec) Close() error {
e.drainOutputCh(w.outputCh)
}
}
if e.BaseExecutor.RuntimeStats() != nil {
if e.BaseExecutorV2.RuntimeStats() != nil {
runtimeStats := &execdetails.RuntimeStatsWithConcurrencyInfo{}
if e.isUnparallelExec() {
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
} else {
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", int(e.numWorkers)))
}
e.Ctx().GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.ID(), runtimeStats)
e.stmtRuntimeStatsColl.RegisterStats(e.ID(), runtimeStats)
}
return e.BaseExecutor.Close()
return e.BaseExecutorV2.Close()
}
type projectionInputFetcher struct {
@ -403,7 +421,7 @@ func (f *projectionInputFetcher) run(ctx context.Context) {
type projectionWorker struct {
proj *ProjectionExec
sctx sessionctx.Context
ctx projectionExecutorContext
evaluatorSuit *expression.EvaluatorSuite
globalFinishCh <-chan struct{}
inputGiveBackCh chan<- *projectionInput
@ -448,7 +466,7 @@ func (w *projectionWorker) run(ctx context.Context) {
}
mSize := output.chk.MemoryUsage() + input.chk.MemoryUsage()
err := w.evaluatorSuit.Run(w.sctx.GetExprCtx().GetEvalCtx(), w.sctx.GetSessionVars().EnableVectorizedExpression, input.chk, output.chk)
err := w.evaluatorSuit.Run(w.ctx.evalCtx, w.ctx.enableVectorizedExpression, input.chk, output.chk)
failpoint.Inject("ConsumeRandomPanic", nil)
w.proj.memTracker.Consume(output.chk.MemoryUsage() + input.chk.MemoryUsage() - mSize)
output.done <- err