executor,distsql: refactor the base executor in tableReader (#51397)

close pingcap/tidb#51396
This commit is contained in:
YangKeao
2024-02-29 21:48:32 +08:00
committed by GitHub
parent 1fc91d46f8
commit e90df0c19d
31 changed files with 486 additions and 278 deletions

View File

@ -302,7 +302,7 @@ func fetchTableScanResult(
}
err = table.FillVirtualColumnValue(
copCtx.VirtualColumnsFieldTypes, copCtx.VirtualColumnsOutputOffsets,
copCtx.ExprColumnInfos, copCtx.ColumnInfos, copCtx.SessionContext, chk)
copCtx.ExprColumnInfos, copCtx.ColumnInfos, copCtx.SessionContext.GetExprCtx(), chk)
return false, err
}

View File

@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/config",
"//pkg/ddl/placement",
"//pkg/distsql/context",
"//pkg/errctx",
"//pkg/errno",
"//pkg/expression",
@ -21,7 +22,6 @@ go_library(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/planner/util",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
"//pkg/store/copr",

View File

@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "context",
srcs = ["context.go"],
importpath = "github.com/pingcap/tidb/pkg/distsql/context",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/sessionctx/variable",
],
)

View File

@ -0,0 +1,28 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package context
import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
)
// DistSQLContext gives the interface
type DistSQLContext interface {
// GetSessionVars gets the session variables.
GetSessionVars() *variable.SessionVars
// GetClient gets a kv.Client.
GetClient() kv.Client
}

View File

@ -21,9 +21,9 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/config"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
@ -37,14 +37,14 @@ import (
)
// GenSelectResultFromMPPResponse generates an iterator from response.
func GenSelectResultFromMPPResponse(sctx sessionctx.Context, fieldTypes []*types.FieldType, planIDs []int, rootID int, resp kv.Response) SelectResult {
func GenSelectResultFromMPPResponse(dctx distsqlctx.DistSQLContext, fieldTypes []*types.FieldType, planIDs []int, rootID int, resp kv.Response) SelectResult {
// TODO: Add metric label and set open tracing.
return &selectResult{
label: "mpp",
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
ctx: dctx,
copPlanIDs: planIDs,
rootPlanID: rootID,
storeType: kv.TiFlash,
@ -53,7 +53,7 @@ func GenSelectResultFromMPPResponse(sctx sessionctx.Context, fieldTypes []*types
// Select sends a DAG request, returns SelectResult.
// In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional.
func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) {
func Select(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request, fieldTypes []*types.FieldType) (SelectResult, error) {
r, ctx := tracing.StartRegionEx(ctx, "distsql.Select")
defer r.End()
@ -62,8 +62,8 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
hook.(func(*kv.Request))(kvReq)
}
enabledRateLimitAction := sctx.GetSessionVars().EnabledRateLimitAction
originalSQL := sctx.GetSessionVars().StmtCtx.OriginalSQL
enabledRateLimitAction := dctx.GetSessionVars().EnabledRateLimitAction
originalSQL := dctx.GetSessionVars().StmtCtx.OriginalSQL
eventCb := func(event trxevents.TransactionEvent) {
// Note: Do not assume this callback will be invoked within the same goroutine.
if copMeetLock := event.GetCopMeetLock(); copMeetLock != nil {
@ -74,27 +74,27 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}
}
ctx = WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
ctx = WithSQLKvExecCounterInterceptor(ctx, dctx.GetSessionVars().StmtCtx)
option := &kv.ClientSendOption{
SessionMemTracker: sctx.GetSessionVars().MemTracker,
SessionMemTracker: dctx.GetSessionVars().MemTracker,
EnabledRateLimitAction: enabledRateLimitAction,
EventCb: eventCb,
EnableCollectExecutionInfo: config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load(),
}
if kvReq.StoreType == kv.TiFlash {
ctx = SetTiFlashConfVarsInContext(ctx, sctx)
option.TiFlashReplicaRead = sctx.GetSessionVars().TiFlashReplicaRead
option.AppendWarning = sctx.GetSessionVars().StmtCtx.AppendWarning
ctx = SetTiFlashConfVarsInContext(ctx, dctx.GetSessionVars())
option.TiFlashReplicaRead = dctx.GetSessionVars().TiFlashReplicaRead
option.AppendWarning = dctx.GetSessionVars().StmtCtx.AppendWarning
}
resp := sctx.GetClient().Send(ctx, kvReq, sctx.GetSessionVars().KVVars, option)
resp := dctx.GetClient().Send(ctx, kvReq, dctx.GetSessionVars().KVVars, option)
if resp == nil {
return nil, errors.New("client returns nil response")
}
label := metrics.LblGeneral
if sctx.GetSessionVars().InRestrictedSQL {
if dctx.GetSessionVars().InRestrictedSQL {
label = metrics.LblInternal
}
@ -106,7 +106,7 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
resp: resp,
rowLen: len(fieldTypes),
fieldTypes: fieldTypes,
ctx: sctx,
ctx: dctx,
sqlType: label,
memTracker: kvReq.MemTracker,
storeType: kvReq.StoreType,
@ -116,34 +116,34 @@ func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fie
}
// SetTiFlashConfVarsInContext set some TiFlash config variables in context.
func SetTiFlashConfVarsInContext(ctx context.Context, sctx sessionctx.Context) context.Context {
if sctx.GetSessionVars().TiFlashMaxThreads != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxTiFlashThreads, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxThreads, 10))
func SetTiFlashConfVarsInContext(ctx context.Context, vars *variable.SessionVars) context.Context {
if vars.TiFlashMaxThreads != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxTiFlashThreads, strconv.FormatInt(vars.TiFlashMaxThreads, 10))
}
if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalJoin != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalJoin, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalJoin, 10))
if vars.TiFlashMaxBytesBeforeExternalJoin != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalJoin, strconv.FormatInt(vars.TiFlashMaxBytesBeforeExternalJoin, 10))
}
if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalGroupBy != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalGroupBy, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalGroupBy, 10))
if vars.TiFlashMaxBytesBeforeExternalGroupBy != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalGroupBy, strconv.FormatInt(vars.TiFlashMaxBytesBeforeExternalGroupBy, 10))
}
if sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalSort, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxBytesBeforeExternalSort, 10))
if vars.TiFlashMaxBytesBeforeExternalSort != -1 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiDBMaxBytesBeforeTiFlashExternalSort, strconv.FormatInt(vars.TiFlashMaxBytesBeforeExternalSort, 10))
}
if sctx.GetSessionVars().TiFlashMaxQueryMemoryPerNode <= 0 {
if vars.TiFlashMaxQueryMemoryPerNode <= 0 {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashMemQuotaQueryPerNode, "0")
} else {
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashMemQuotaQueryPerNode, strconv.FormatInt(sctx.GetSessionVars().TiFlashMaxQueryMemoryPerNode, 10))
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashMemQuotaQueryPerNode, strconv.FormatInt(vars.TiFlashMaxQueryMemoryPerNode, 10))
}
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashQuerySpillRatio, strconv.FormatFloat(sctx.GetSessionVars().TiFlashQuerySpillRatio, 'f', -1, 64))
ctx = metadata.AppendToOutgoingContext(ctx, variable.TiFlashQuerySpillRatio, strconv.FormatFloat(vars.TiFlashQuerySpillRatio, 'f', -1, 64))
return ctx
}
// SelectWithRuntimeStats sends a DAG request, returns SelectResult.
// The difference from Select is that SelectWithRuntimeStats will set copPlanIDs into selectResult,
// which can help selectResult to collect runtime stats.
func SelectWithRuntimeStats(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
func SelectWithRuntimeStats(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request,
fieldTypes []*types.FieldType, copPlanIDs []int, rootPlanID int) (SelectResult, error) {
sr, err := Select(ctx, sctx, kvReq, fieldTypes)
sr, err := Select(ctx, dctx, kvReq, fieldTypes)
if err != nil {
return nil, err
}
@ -198,7 +198,7 @@ func Checksum(ctx context.Context, client kv.Client, kvReq *kv.Request, vars any
// methods are:
// 1. TypeChunk: the result is encoded using the Chunk format, refer util/chunk/chunk.go
// 2. TypeDefault: the result is encoded row by row
func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) {
func SetEncodeType(ctx distsqlctx.DistSQLContext, dagReq *tipb.DAGRequest) {
if canUseChunkRPC(ctx) {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
setChunkMemoryLayout(dagReq)
@ -207,7 +207,7 @@ func SetEncodeType(ctx sessionctx.Context, dagReq *tipb.DAGRequest) {
}
}
func canUseChunkRPC(ctx sessionctx.Context) bool {
func canUseChunkRPC(ctx distsqlctx.DistSQLContext) bool {
if !ctx.GetSessionVars().EnableChunkRPC {
return false
}

View File

@ -26,13 +26,13 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
dcontext "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/store/copr"
"github.com/pingcap/tidb/pkg/types"
@ -286,7 +286,7 @@ type selectResult struct {
rowLen int
fieldTypes []*types.FieldType
ctx sessionctx.Context
ctx dcontext.DistSQLContext
selectResp *tipb.SelectResponse
selectRespSize int64 // record the selectResp.Size() when it is initialized.

View File

@ -104,6 +104,7 @@ go_library(
"//pkg/ddl/placement",
"//pkg/ddl/schematracker",
"//pkg/distsql",
"//pkg/distsql/context",
"//pkg/disttask/framework/handle",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/storage",
@ -131,6 +132,7 @@ go_library(
"//pkg/executor/sortexec",
"//pkg/expression",
"//pkg/expression/aggregation",
"//pkg/expression/context",
"//pkg/infoschema",
"//pkg/keyspace",
"//pkg/kv",
@ -378,6 +380,7 @@ go_test(
"//pkg/ddl/placement",
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/distsql/context",
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/errctx",

View File

@ -210,7 +210,7 @@ func (e *AnalyzeColumnsExecV2) decodeSampleDataWithVirtualColumn(
}
}
}
err := table.FillVirtualColumnValue(fieldTps, virtualColIdx, schema.Columns, e.colsInfo, e.ctx, chk)
err := table.FillVirtualColumnValue(fieldTps, virtualColIdx, schema.Columns, e.colsInfo, e.ctx.GetExprCtx(), chk)
if err != nil {
return err
}

View File

@ -201,7 +201,7 @@ func (e *BatchPointGetExec) Next(ctx context.Context, req *chunk.Chunk) error {
e.index++
}
err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), req)
err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), req)
if err != nil {
return err
}

View File

@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
plannerutil "github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
@ -1777,11 +1778,11 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) exec.
b.err = errors.Errorf("buildTableDual failed, invalid row count for dual table: %v", v.RowCount)
return nil
}
base := exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID())
base := exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID())
base.SetInitCap(v.RowCount)
e := &TableDualExec{
BaseExecutor: base,
numDualRows: v.RowCount,
BaseExecutorV2: base,
numDualRows: v.RowCount,
}
return e
}
@ -2917,7 +2918,7 @@ func (b *executorBuilder) newDataReaderBuilder(p plannercore.PhysicalPlan) (*dat
builderForDataReader.dataReaderTS = ts
return &dataReaderBuilder{
Plan: p,
plan: p,
executorBuilder: &builderForDataReader,
}, nil
}
@ -3206,26 +3207,28 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, err
}
paging := b.ctx.GetSessionVars().EnablePaging
e := &TableReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
byItems: ts.ByItems,
columns: ts.Columns,
paging: paging,
corColInFilter: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.TablePlans[0]),
plans: v.TablePlans,
tablePlan: v.GetTablePlan(),
storeType: v.StoreType,
batchCop: v.ReadReqType == plannercore.BatchCop,
BaseExecutorV2: exec.NewBaseExecutorV2(b.ctx.GetSessionVars(), v.Schema(), v.ID()),
tableReaderExecutorContext: newTableReaderExecutorContext(b.ctx),
dagPB: dagReq,
startTS: startTS,
txnScope: b.txnScope,
readReplicaScope: b.readReplicaScope,
isStaleness: b.isStaleness,
netDataSize: v.GetNetDataSize(),
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
byItems: ts.ByItems,
columns: ts.Columns,
paging: paging,
corColInFilter: b.corColInDistPlan(v.TablePlans),
corColInAccess: b.corColInAccess(v.TablePlans[0]),
plans: v.TablePlans,
tablePlan: v.GetTablePlan(),
storeType: v.StoreType,
batchCop: v.ReadReqType == plannercore.BatchCop,
}
e.buildVirtualColumnInfo()
@ -3389,7 +3392,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) e
}
if len(partitions) == 0 {
return &TableDualExec{BaseExecutor: ret.BaseExecutor}
return &TableDualExec{BaseExecutorV2: ret.BaseExecutorV2}
}
// Sort the partition is necessary to make the final multiple partition key ranges ordered.
@ -3997,7 +4000,7 @@ func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMerg
// 1. dataReaderBuilder calculate data range from argument, rather than plan.
// 2. the result executor is already opened.
type dataReaderBuilder struct {
plannercore.Plan
plan plannercore.Plan
*executorBuilder
selectResultHook // for testing
@ -4021,7 +4024,7 @@ func (*mockPhysicalIndexReader) MemoryUsage() (sum int64) {
func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent,
indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool, memTracker *memory.Tracker, interruptSignal *atomic.Value) (exec.Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
return builder.buildExecutorForIndexJoinInternal(ctx, builder.plan, lookUpContents, indexRanges, keyOff2IdxOff, cwc, canReorderHandles, memTracker, interruptSignal)
}
func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent,
@ -4092,7 +4095,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
if v.IsCommonHandle {
kvRanges, err := buildKvRangesForIndexJoin(e.Ctx(), getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
kvRanges, err := buildKvRangesForIndexJoin(e.GetSessionVars().StmtCtx, e.pctx, getPhysicalTableID(e.table), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
@ -4122,7 +4125,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
// lookUpContentsByPID groups lookUpContents by pid(partition) so that kv ranges for same partition can be merged.
lookUpContentsByPID := make(map[int64][]*indexJoinLookUpContent)
exprCtx := e.Ctx().GetExprCtx()
exprCtx := e.ectx
for _, content := range lookUpContents {
for i, data := range content.keys {
locateKey[keyColOffsets[i]] = data
@ -4142,7 +4145,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}
for pid, contents := range lookUpContentsByPID {
// buildKvRanges for each partition.
tmp, err := buildKvRangesForIndexJoin(e.Ctx(), pid, -1, contents, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal)
tmp, err := buildKvRangesForIndexJoin(e.GetSessionVars().StmtCtx, e.pctx, pid, -1, contents, indexRanges, keyOff2IdxOff, cwc, nil, interruptSignal)
if err != nil {
return nil, err
}
@ -4151,7 +4154,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
} else {
kvRanges = make([]kv.KeyRange, 0, len(usedPartitions)*len(lookUpContents))
for _, p := range usedPartitionList {
tmp, err := buildKvRangesForIndexJoin(e.Ctx(), p.GetPhysicalID(), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
tmp, err := buildKvRangesForIndexJoin(e.GetSessionVars().StmtCtx, e.pctx, p.GetPhysicalID(), -1, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
@ -4170,7 +4173,7 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
if len(keyColOffsets) > 0 {
locateKey := make([]types.Datum, len(pt.Cols()))
kvRanges = make([]kv.KeyRange, 0, len(lookUpContents))
exprCtx := e.Ctx().GetExprCtx()
exprCtx := e.ectx
for _, content := range lookUpContents {
for i, data := range content.keys {
locateKey[keyColOffsets[i]] = data
@ -4267,13 +4270,13 @@ func (h kvRangeBuilderFromRangeAndPartition) buildKeyRange(ranges []*ranger.Rang
// newClosestReadAdjuster let the request be sent to closest replica(within the same zone)
// if response size exceeds certain threshold.
func newClosestReadAdjuster(ctx sessionctx.Context, req *kv.Request, netDataSize float64) kv.CoprRequestAdjuster {
func newClosestReadAdjuster(vars *variable.SessionVars, req *kv.Request, netDataSize float64) kv.CoprRequestAdjuster {
if req.ReplicaRead != kv.ReplicaReadClosestAdaptive {
return nil
}
return func(req *kv.Request, copTaskCount int) bool {
// copTaskCount is the number of coprocessor requests
if int64(netDataSize/float64(copTaskCount)) >= ctx.GetSessionVars().ReplicaClosestReadThreshold {
if int64(netDataSize/float64(copTaskCount)) >= vars.ReplicaClosestReadThreshold {
req.MatchStoreLabels = append(req.MatchStoreLabels, &metapb.StoreLabel{
Key: placement.DCLabelKey,
Value: config.GetTxnScopeFromConfig(),
@ -4299,11 +4302,11 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.Ctx().GetSessionVars()).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilderWithRange.Request, e.netDataSize)).
SetFromSessionVars(e.GetSessionVars()).
SetFromInfoSchema(e.GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilderWithRange.Request, e.netDataSize)).
SetPaging(e.paging).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias).
SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias).
Build()
if err != nil {
return nil, err
@ -4351,7 +4354,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
kvRanges, err := buildKvRangesForIndexJoin(e.Ctx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
kvRanges, err := buildKvRangesForIndexJoin(e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetPlanCtx(), e.physicalTableID, e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memoryTracker, interruptSignal)
if err != nil {
return nil, err
}
@ -4408,7 +4411,7 @@ func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Conte
}
return e, nil
}
ret := &TableDualExec{BaseExecutor: e.BaseExecutor}
ret := &TableDualExec{BaseExecutorV2: e.BaseExecutorV2}
err = exec.Open(ctx, ret)
return ret, err
}
@ -4422,7 +4425,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
tbInfo := e.table.Meta()
if tbInfo.GetPartitionInfo() == nil || !builder.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
e.kvRanges, err = buildKvRangesForIndexJoin(e.Ctx().GetSessionVars().StmtCtx, e.Ctx().GetPlanCtx(), getPhysicalTableID(e.table), e.index.ID, lookUpContents, indexRanges, keyOff2IdxOff, cwc, memTracker, interruptSignal)
if err != nil {
return nil, err
}
@ -4482,7 +4485,7 @@ func (builder *dataReaderBuilder) buildIndexLookUpReaderForIndexJoin(ctx context
}
return e, err
}
ret := &TableDualExec{BaseExecutor: e.BaseExecutor}
ret := &TableDualExec{BaseExecutorV2: e.BaseExecutorV2}
err = exec.Open(ctx, ret)
return ret, err
}
@ -4565,14 +4568,13 @@ func buildRangesForIndexJoin(ctx sessionctx.Context, lookUpContents []*indexJoin
}
// buildKvRangesForIndexJoin builds kv ranges for index join when the inner plan is index scan plan.
func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent,
func buildKvRangesForIndexJoin(sc *stmtctx.StatementContext, pctx planctx.PlanContext, tableID, indexID int64, lookUpContents []*indexJoinLookUpContent,
ranges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, memTracker *memory.Tracker, interruptSignal *atomic.Value) (_ []kv.KeyRange, err error) {
kvRanges := make([]kv.KeyRange, 0, len(ranges)*len(lookUpContents))
if len(ranges) == 0 {
return []kv.KeyRange{}, nil
}
lastPos := len(ranges[0].LowVal) - 1
sc := ctx.GetSessionVars().StmtCtx
tmpDatumRanges := make([]*ranger.Range, 0, len(lookUpContents))
for _, content := range lookUpContents {
for _, ran := range ranges {
@ -4596,7 +4598,7 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
kvRanges = tmpKvRanges.AppendSelfTo(kvRanges)
continue
}
nextColRanges, err := cwc.BuildRangesByRow(ctx.GetPlanCtx(), content.row)
nextColRanges, err := cwc.BuildRangesByRow(pctx, content.row)
if err != nil {
return nil, err
}
@ -4627,16 +4629,16 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
return kvRanges, nil
}
tmpDatumRanges, err = ranger.UnionRanges(ctx.GetPlanCtx(), tmpDatumRanges, true)
tmpDatumRanges, err = ranger.UnionRanges(pctx, tmpDatumRanges, true)
if err != nil {
return nil, err
}
// Index id is -1 means it's a common handle.
if indexID == -1 {
tmpKeyRanges, err := distsql.CommonHandleRangesToKVRanges(ctx.GetSessionVars().StmtCtx, []int64{tableID}, tmpDatumRanges)
tmpKeyRanges, err := distsql.CommonHandleRangesToKVRanges(sc, []int64{tableID}, tmpDatumRanges)
return tmpKeyRanges.FirstPartitionRange(), err
}
tmpKeyRanges, err := distsql.IndexRangesToKVRangesWithInterruptSignal(ctx.GetSessionVars().StmtCtx, tableID, indexID, tmpDatumRanges, memTracker, interruptSignal)
tmpKeyRanges, err := distsql.IndexRangesToKVRangesWithInterruptSignal(sc, tableID, indexID, tmpDatumRanges, memTracker, interruptSignal)
return tmpKeyRanges.FirstPartitionRange(), err
}

View File

@ -312,7 +312,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
SetFromSessionVars(e.Ctx().GetSessionVars()).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetMemTracker(e.memTracker).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &builder.Request, e.netDataSize)).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetSessionVars(), &builder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
kvReq, err := builder.Build()
return kvReq, err
@ -321,7 +321,7 @@ func (e *IndexReaderExecutor) buildKVReq(r []kv.KeyRange) (*kv.Request, error) {
func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange) error {
var err error
if e.corColInFilter {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.plans)
if err != nil {
return err
}
@ -583,14 +583,14 @@ func (e *IndexLookUpExecutor) open(_ context.Context) error {
var err error
if e.corColInIdxSide {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.idxPlans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.idxPlans)
if err != nil {
return err
}
}
if e.corColInTblSide {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans)
if err != nil {
return err
}
@ -716,7 +716,7 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.Ctx().GetSessionVars()).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetSessionVars(), &builder.Request, e.idxNetDataSize/float64(len(kvRanges)))).
SetMemTracker(tracker).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
@ -804,18 +804,19 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup
table = task.partitionTable
}
tableReaderExec := &TableReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(e.Ctx(), e.Schema(), e.getTableRootPlanID()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
netDataSize: e.avgRowSize * float64(len(task.handles)),
byItems: e.byItems,
BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTableRootPlanID()),
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()),
table: table,
dagPB: e.tableRequest,
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
corColInFilter: e.corColInTblSide,
plans: e.tblPlans,
netDataSize: e.avgRowSize * float64(len(task.handles)),
byItems: e.byItems,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, task.handles, true)

View File

@ -1506,7 +1506,7 @@ func init() {
// TableDualExec represents a dual table executor.
type TableDualExec struct {
exec.BaseExecutor
exec.BaseExecutorV2
// numDualRows can only be 0 or 1.
numDualRows int

View File

@ -65,7 +65,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwc(t *testing.T) {
keyOff2IdxOff := []int{1, 3}
ctx := mock.NewContext()
kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil)
kvRanges, err := buildKvRangesForIndexJoin(ctx.GetSessionVars().StmtCtx, ctx.GetPlanCtx(), 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, nil, nil)
require.NoError(t, err)
// Check the kvRanges is in order.
for i, kvRange := range kvRanges {
@ -95,7 +95,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) {
keyOff2IdxOff := []int{1, 3}
ctx := mock.NewContext()
memTracker := memory.NewTracker(memory.LabelForIndexWorker, -1)
kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil)
kvRanges, err := buildKvRangesForIndexJoin(ctx.GetSessionVars().StmtCtx, ctx.GetPlanCtx(), 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil)
require.NoError(t, err)
// Check the kvRanges is in order.
for i, kvRange := range kvRanges {
@ -117,7 +117,7 @@ func TestBuildKvRangesForIndexJoinWithoutCwcAndWithMemoryTracker(t *testing.T) {
keyOff2IdxOff := []int{1, 3}
ctx := mock.NewContext()
memTracker := memory.NewTracker(memory.LabelForIndexWorker, -1)
kvRanges, err := buildKvRangesForIndexJoin(ctx, 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil)
kvRanges, err := buildKvRangesForIndexJoin(ctx.GetSessionVars().StmtCtx, ctx.GetPlanCtx(), 0, 0, joinKeyRows, indexRanges, keyOff2IdxOff, nil, memTracker, nil)
require.NoError(t, err)
// Check the kvRanges is in order.
for i, kvRange := range kvRanges {

View File

@ -167,7 +167,7 @@ func (e *IndexMergeReaderExecutor) Open(_ context.Context) (err error) {
e.keyRanges = make([][]kv.KeyRange, 0, len(e.partialPlans))
e.initRuntimeStats()
if e.isCorColInTableFilter {
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.tblPlans)
e.tableRequest.Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.tblPlans)
if err != nil {
return err
}
@ -370,7 +370,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
if e.isCorColInPartialFilters[workID] {
// We got correlated column, so need to refresh Selection operator.
var err error
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
@ -388,7 +388,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
SetMemTracker(e.memTracker).
SetPaging(e.paging).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &builder.Request, e.partialNetDataSizes[workID])).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx().GetSessionVars(), &builder.Request, e.partialNetDataSizes[workID])).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
tps := worker.getRetTpsForIndexScan(e.handleCols)
@ -474,17 +474,18 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
failpoint.Inject("testIndexMergePanicPartialTableWorker", nil)
var err error
partialTableReader := &TableReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(e.Ctx(), ts.Schema(), e.getPartitalPlanID(workID)),
dagPB: e.dagPBs[workID],
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
plans: e.partialPlans[workID],
ranges: e.ranges[workID],
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: ts.ByItems,
BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), ts.Schema(), e.getPartitalPlanID(workID)),
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()),
dagPB: e.dagPBs[workID],
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
plans: e.partialPlans[workID],
ranges: e.ranges[workID],
netDataSize: e.partialNetDataSizes[workID],
keepOrder: ts.KeepOrder,
byItems: ts.ByItems,
}
worker := &partialTableWorker{
@ -512,7 +513,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
}
if e.isCorColInPartialFilters[workID] {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.partialPlans[workID]); err != nil {
if e.dagPBs[workID].Executors, err = builder.ConstructListBasedDistExec(e.Ctx().GetPlanCtx(), e.partialPlans[workID]); err != nil {
syncErr(ctx, e.finished, fetchCh, err)
return
}
@ -785,16 +786,17 @@ func (e *IndexMergeReaderExecutor) startIndexMergeTableScanWorker(ctx context.Co
func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, tbl table.Table, handles []kv.Handle) (_ exec.Executor, err error) {
tableReaderExec := &TableReaderExecutor{
BaseExecutor: exec.NewBaseExecutor(e.Ctx(), e.Schema(), e.getTablePlanRootID()),
table: tbl,
dagPB: e.tableRequest,
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
plans: e.tblPlans,
netDataSize: e.dataAvgRowSize * float64(len(handles)),
BaseExecutorV2: exec.NewBaseExecutorV2(e.Ctx().GetSessionVars(), e.Schema(), e.getTablePlanRootID()),
tableReaderExecutorContext: newTableReaderExecutorContext(e.Ctx()),
table: tbl,
dagPB: e.tableRequest,
startTS: e.startTS,
txnScope: e.txnScope,
readReplicaScope: e.readReplicaScope,
isStaleness: e.isStaleness,
columns: e.columns,
plans: e.tblPlans,
netDataSize: e.dataAvgRowSize * float64(len(handles)),
}
tableReaderExec.buildVirtualColumnInfo()
// Reorder handles because SplitKeyRangesByLocations() requires startKey of kvRanges is ordered.

View File

@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/distsql",
"//pkg/kv",
"//pkg/planner/context",
"//pkg/planner/core",
"//pkg/sessionctx",
"//pkg/util/timeutil",

View File

@ -17,6 +17,7 @@ package builder
import (
"github.com/pingcap/tidb/pkg/distsql"
"github.com/pingcap/tidb/pkg/kv"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/timeutil"
@ -24,16 +25,16 @@ import (
)
// ConstructTreeBasedDistExec constructs tree based DAGRequest
func ConstructTreeBasedDistExec(sctx sessionctx.Context, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
execPB, err := p.ToPB(sctx.GetPlanCtx(), kv.TiFlash)
func ConstructTreeBasedDistExec(pctx planctx.PlanContext, p plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
execPB, err := p.ToPB(pctx, kv.TiFlash)
return []*tipb.Executor{execPB}, err
}
// ConstructListBasedDistExec constructs list based DAGRequest
func ConstructListBasedDistExec(sctx sessionctx.Context, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
func ConstructListBasedDistExec(pctx planctx.PlanContext, plans []plannercore.PhysicalPlan) ([]*tipb.Executor, error) {
executors := make([]*tipb.Executor, 0, len(plans))
for _, p := range plans {
execPB, err := p.ToPB(sctx.GetPlanCtx(), kv.TiKV)
execPB, err := p.ToPB(pctx, kv.TiKV)
if err != nil {
return nil, err
}
@ -54,10 +55,10 @@ func ConstructDAGReq(ctx sessionctx.Context, plans []plannercore.PhysicalPlan, s
dagReq.Flags = sc.PushDownFlags()
if storeType == kv.TiFlash {
var executors []*tipb.Executor
executors, err = ConstructTreeBasedDistExec(ctx, plans[0])
executors, err = ConstructTreeBasedDistExec(ctx.GetPlanCtx(), plans[0])
dagReq.RootExecutor = executors[0]
} else {
dagReq.Executors, err = ConstructListBasedDistExec(ctx, plans)
dagReq.Executors, err = ConstructListBasedDistExec(ctx.GetPlanCtx(), plans)
}
distsql.SetEncodeType(ctx, dagReq)

View File

@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/domain",
"//pkg/expression",
"//pkg/parser",
"//pkg/sessionctx",
"//pkg/sessionctx/stmtctx",
"//pkg/sessionctx/variable",
@ -26,6 +27,7 @@ go_library(
"//pkg/util/topsql/state",
"//pkg/util/tracing",
"@com_github_ngaut_pools//:pools",
"@org_uber_go_atomic//:atomic",
],
)

View File

@ -22,7 +22,9 @@ import (
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
@ -32,6 +34,7 @@ import (
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"github.com/pingcap/tidb/pkg/util/tracing"
"go.uber.org/atomic"
)
// Executor is the physical implementation of an algebra operator.
@ -67,34 +70,68 @@ type Executor interface {
var _ Executor = &BaseExecutor{}
// BaseExecutor holds common information for executors.
type BaseExecutor struct {
ctx sessionctx.Context
// executorChunkAllocator is a helper to implement `Chunk` related methods in `Executor` interface
type executorChunkAllocator struct {
AllocPool chunk.Allocator
schema *expression.Schema // output schema
runtimeStats *execdetails.BasicRuntimeStats
children []Executor
retFieldTypes []*types.FieldType
id int
initCap int
maxChunkSize int
}
// NewBaseExecutor creates a new BaseExecutor instance.
func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) BaseExecutor {
e := BaseExecutor{
children: children,
ctx: ctx,
id: id,
schema: schema,
initCap: ctx.GetSessionVars().InitChunkSize,
maxChunkSize: ctx.GetSessionVars().MaxChunkSize,
AllocPool: ctx.GetSessionVars().GetChunkAllocator(),
// newExecutorChunkAllocator creates a new `executorChunkAllocator`
func newExecutorChunkAllocator(vars *variable.SessionVars, retFieldTypes []*types.FieldType) executorChunkAllocator {
return executorChunkAllocator{
AllocPool: vars.GetChunkAllocator(),
initCap: vars.InitChunkSize,
maxChunkSize: vars.MaxChunkSize,
retFieldTypes: retFieldTypes,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id > 0 {
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
}
}
// InitCap returns the initial capacity for chunk
func (e *executorChunkAllocator) InitCap() int {
return e.initCap
}
// SetInitCap sets the initial capacity for chunk
func (e *executorChunkAllocator) SetInitCap(c int) {
e.initCap = c
}
// MaxChunkSize returns the max chunk size.
func (e *executorChunkAllocator) MaxChunkSize() int {
return e.maxChunkSize
}
// SetMaxChunkSize sets the max chunk size.
func (e *executorChunkAllocator) SetMaxChunkSize(size int) {
e.maxChunkSize = size
}
// NewChunk creates a new chunk according to the executor configuration
func (e *executorChunkAllocator) NewChunk() *chunk.Chunk {
return e.NewChunkWithCapacity(e.retFieldTypes, e.InitCap(), e.MaxChunkSize())
}
// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *executorChunkAllocator) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
return e.AllocPool.Alloc(fields, capacity, maxCachesize)
}
// executorMeta is a helper to store metadata for an execturo and implement the getter
type executorMeta struct {
schema *expression.Schema
children []Executor
retFieldTypes []*types.FieldType
id int
}
// newExecutorMeta creates a new `executorMeta`
func newExecutorMeta(schema *expression.Schema, id int, children ...Executor) executorMeta {
e := executorMeta{
id: id,
schema: schema,
children: children,
}
if schema != nil {
cols := schema.Columns
@ -106,68 +143,141 @@ func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int,
return e
}
// RuntimeStats returns the runtime stats of an executor.
func (e *BaseExecutor) RuntimeStats() *execdetails.BasicRuntimeStats {
return e.runtimeStats
// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *executorMeta) RetFieldTypes() []*types.FieldType {
return e.retFieldTypes
}
// ID returns the id of an executor.
func (e *BaseExecutor) ID() int {
func (e *executorMeta) ID() int {
return e.id
}
// AllChildren returns all children.
func (e *BaseExecutor) AllChildren() []Executor {
func (e *executorMeta) AllChildren() []Executor {
return e.children
}
// ChildrenLen returns the length of children.
func (e *BaseExecutor) ChildrenLen() int {
func (e *executorMeta) ChildrenLen() int {
return len(e.children)
}
// EmptyChildren judges whether the children is empty.
func (e *BaseExecutor) EmptyChildren() bool {
func (e *executorMeta) EmptyChildren() bool {
return len(e.children) == 0
}
// SetChildren sets the children for an executor.
func (e *BaseExecutor) SetChildren(idx int, ex Executor) {
func (e *executorMeta) SetChildren(idx int, ex Executor) {
e.children[idx] = ex
}
// Children returns the children for an executor.
func (e *BaseExecutor) Children(idx int) Executor {
func (e *executorMeta) Children(idx int) Executor {
return e.children[idx]
}
// RetFieldTypes returns the return field types of an executor.
func (e *BaseExecutor) RetFieldTypes() []*types.FieldType {
return e.retFieldTypes
// Schema returns the current BaseExecutor's schema. If it is nil, then create and return a new one.
func (e *executorMeta) Schema() *expression.Schema {
if e.schema == nil {
return expression.NewSchema()
}
return e.schema
}
// InitCap returns the initial capacity for chunk
func (e *BaseExecutor) InitCap() int {
return e.initCap
// GetSchema gets the schema.
func (e *executorMeta) GetSchema() *expression.Schema {
return e.schema
}
// SetInitCap sets the initial capacity for chunk
func (e *BaseExecutor) SetInitCap(c int) {
e.initCap = c
// executorStats is a helper to implement the stats related methods for `Executor`
type executorStats struct {
runtimeStats *execdetails.BasicRuntimeStats
isSQLAndPlanRegistered *atomic.Bool
sqlDigest *parser.Digest
planDigest *parser.Digest
normalizedSQL string
normalizedPlan string
inRestrictedSQL bool
}
// MaxChunkSize returns the max chunk size.
func (e *BaseExecutor) MaxChunkSize() int {
return e.maxChunkSize
// newExecutorStats creates a new `executorStats`
func newExecutorStats(stmtCtx *stmtctx.StatementContext, id int) executorStats {
normalizedSQL, sqlDigest := stmtCtx.SQLDigest()
normalizedPlan, planDigest := stmtCtx.GetPlanDigest()
e := executorStats{
isSQLAndPlanRegistered: &stmtCtx.IsSQLAndPlanRegistered,
normalizedSQL: normalizedSQL,
sqlDigest: sqlDigest,
normalizedPlan: normalizedPlan,
planDigest: planDigest,
inRestrictedSQL: stmtCtx.InRestrictedSQL,
}
if stmtCtx.RuntimeStatsColl != nil {
if id > 0 {
e.runtimeStats = stmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
}
}
return e
}
// SetMaxChunkSize sets the max chunk size.
func (e *BaseExecutor) SetMaxChunkSize(size int) {
e.maxChunkSize = size
// RuntimeStats returns the runtime stats of an executor.
func (e *executorStats) RuntimeStats() *execdetails.BasicRuntimeStats {
return e.runtimeStats
}
// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql
func (e *executorStats) RegisterSQLAndPlanInExecForTopSQL() {
if topsqlstate.TopSQLEnabled() && e.isSQLAndPlanRegistered.CompareAndSwap(false, true) {
topsql.RegisterSQL(e.normalizedSQL, e.sqlDigest, e.inRestrictedSQL)
if len(e.normalizedPlan) > 0 {
topsql.RegisterPlan(e.normalizedPlan, e.planDigest)
}
}
}
type signalHandler interface {
HandleSignal() error
}
// executorKillerHandler is a helper to implement the killer related methods for `Executor`.
type executorKillerHandler struct {
handler signalHandler
}
func (e *executorKillerHandler) HandleSQLKillerSignal() error {
return e.handler.HandleSignal()
}
func newExecutorKillerHandler(handler signalHandler) executorKillerHandler {
return executorKillerHandler{handler}
}
// BaseExecutorV2 is a simplified version of `BaseExecutor`, which doesn't contain a full session context
type BaseExecutorV2 struct {
executorMeta
executorKillerHandler
executorStats
executorChunkAllocator
}
// NewBaseExecutorV2 creates a new BaseExecutorV2 instance.
func NewBaseExecutorV2(vars *variable.SessionVars, schema *expression.Schema, id int, children ...Executor) BaseExecutorV2 {
executorMeta := newExecutorMeta(schema, id, children...)
e := BaseExecutorV2{
executorMeta: executorMeta,
executorStats: newExecutorStats(vars.StmtCtx, id),
executorChunkAllocator: newExecutorChunkAllocator(vars, executorMeta.RetFieldTypes()),
executorKillerHandler: newExecutorKillerHandler(&vars.SQLKiller),
}
return e
}
// Open initializes children recursively and "childrenResults" according to children's schemas.
func (e *BaseExecutor) Open(ctx context.Context) error {
func (e *BaseExecutorV2) Open(ctx context.Context) error {
for _, child := range e.children {
err := Open(ctx, child)
if err != nil {
@ -178,7 +288,7 @@ func (e *BaseExecutor) Open(ctx context.Context) error {
}
// Close closes all executors and release all resources.
func (e *BaseExecutor) Close() error {
func (e *BaseExecutorV2) Close() error {
var firstErr error
for _, src := range e.children {
if err := Close(src); err != nil && firstErr == nil {
@ -188,17 +298,24 @@ func (e *BaseExecutor) Close() error {
return firstErr
}
// Schema returns the current BaseExecutor's schema. If it is nil, then create and return a new one.
func (e *BaseExecutor) Schema() *expression.Schema {
if e.schema == nil {
return expression.NewSchema()
}
return e.schema
// Next fills multiple rows into a chunk.
func (*BaseExecutorV2) Next(_ context.Context, _ *chunk.Chunk) error {
return nil
}
// Next fills multiple rows into a chunk.
func (*BaseExecutor) Next(_ context.Context, _ *chunk.Chunk) error {
return nil
// BaseExecutor holds common information for executors.
type BaseExecutor struct {
ctx sessionctx.Context
BaseExecutorV2
}
// NewBaseExecutor creates a new BaseExecutor instance.
func NewBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int, children ...Executor) BaseExecutor {
return BaseExecutor{
ctx: ctx,
BaseExecutorV2: NewBaseExecutorV2(ctx.GetSessionVars(), schema, id, children...),
}
}
// Ctx return ```sessionctx.Context``` of Executor
@ -206,11 +323,6 @@ func (e *BaseExecutor) Ctx() sessionctx.Context {
return e.ctx
}
// GetSchema gets the schema.
func (e *BaseExecutor) GetSchema() *expression.Schema {
return e.schema
}
// UpdateDeltaForTableID updates the delta info for the table with tableID.
func (e *BaseExecutor) UpdateDeltaForTableID(id int64) {
txnCtx := e.ctx.GetSessionVars().TxnCtx
@ -244,30 +356,6 @@ func (e *BaseExecutor) ReleaseSysSession(ctx context.Context, sctx sessionctx.Co
sysSessionPool.Put(sctx.(pools.Resource))
}
// NewChunk creates a new chunk according to the executor configuration
func (e *BaseExecutor) NewChunk() *chunk.Chunk {
return e.NewChunkWithCapacity(e.RetFieldTypes(), e.InitCap(), e.MaxChunkSize())
}
// NewChunkWithCapacity allows the caller to allocate the chunk with any types, capacity and max size in the pool
func (e *BaseExecutor) NewChunkWithCapacity(fields []*types.FieldType, capacity int, maxCachesize int) *chunk.Chunk {
return e.AllocPool.Alloc(fields, capacity, maxCachesize)
}
// HandleSQLKillerSignal handles the signal sent by SQLKiller
func (e *BaseExecutor) HandleSQLKillerSignal() error {
return e.ctx.GetSessionVars().SQLKiller.HandleSignal()
}
// RegisterSQLAndPlanInExecForTopSQL registers the current SQL and Plan on top sql
// TODO: consider whether it's appropriate to have this on executor
func (e *BaseExecutor) RegisterSQLAndPlanInExecForTopSQL() {
sessVars := e.ctx.GetSessionVars()
if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) {
RegisterSQLAndPlanInExecForTopSQL(sessVars)
}
}
// TryNewCacheChunk tries to get a cached chunk
func TryNewCacheChunk(e Executor) *chunk.Chunk {
return e.NewChunk()
@ -331,15 +419,3 @@ func Close(e Executor) (err error) {
}()
return e.Close()
}
// RegisterSQLAndPlanInExecForTopSQL register the sql and plan information if it doesn't register before execution.
// This uses to catch the running SQL when Top SQL is enabled in execution.
func RegisterSQLAndPlanInExecForTopSQL(sessVars *variable.SessionVars) {
stmtCtx := sessVars.StmtCtx
normalizedSQL, sqlDigest := stmtCtx.SQLDigest()
topsql.RegisterSQL(normalizedSQL, sqlDigest, sessVars.InRestrictedSQL)
normalizedPlan, planDigest := stmtCtx.GetPlanDigest()
if len(normalizedPlan) > 0 {
topsql.RegisterPlan(normalizedPlan, planDigest)
}
}

View File

@ -754,7 +754,7 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke
ctx = distsql.WithSQLKvExecCounterInterceptor(ctx, sctx.GetSessionVars().StmtCtx)
_, allowTiFlashFallback := sctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
ctx = distsql.SetTiFlashConfVarsInContext(ctx, sctx)
ctx = distsql.SetTiFlashConfVarsInContext(ctx, sctx.GetSessionVars())
c.needTriggerFallback = allowTiFlashFallback
c.enableCollectExecutionInfo = config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load()

View File

@ -121,7 +121,7 @@ func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error {
return nil
}
return table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), chk)
return table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), chk)
}
// Close and release the used resources.

View File

@ -352,7 +352,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
err = table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex,
e.Schema().Columns, e.columns, e.Ctx(), req)
e.Schema().Columns, e.columns, e.Ctx().GetExprCtx(), req)
if err != nil {
return err
}

View File

@ -22,21 +22,25 @@ import (
"time"
"unsafe"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/distsql"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/internal/builder"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
internalutil "github.com/pingcap/tidb/pkg/executor/internal/util"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser/model"
planctx "github.com/pingcap/tidb/pkg/planner/context"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/planner/util"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util/chunk"
@ -54,16 +58,16 @@ var _ exec.Executor = &TableReaderExecutor{}
// selectResultHook is used to hack distsql.SelectWithRuntimeStats safely for testing.
type selectResultHook struct {
selectResultFunc func(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
selectResultFunc func(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request,
fieldTypes []*types.FieldType, copPlanIDs []int) (distsql.SelectResult, error)
}
func (sr selectResultHook) SelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
func (sr selectResultHook) SelectResult(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request,
fieldTypes []*types.FieldType, copPlanIDs []int, rootPlanID int) (distsql.SelectResult, error) {
if sr.selectResultFunc == nil {
return distsql.SelectWithRuntimeStats(ctx, sctx, kvReq, fieldTypes, copPlanIDs, rootPlanID)
return distsql.SelectWithRuntimeStats(ctx, dctx, kvReq, fieldTypes, copPlanIDs, rootPlanID)
}
return sr.selectResultFunc(ctx, sctx, kvReq, fieldTypes, copPlanIDs)
return sr.selectResultFunc(ctx, dctx, kvReq, fieldTypes, copPlanIDs)
}
type kvRangeBuilder interface {
@ -71,9 +75,60 @@ type kvRangeBuilder interface {
buildKeyRangeSeparately(ranges []*ranger.Range) ([]int64, [][]kv.KeyRange, error)
}
// tableReaderExecutorContext is the execution context for the `TableReaderExecutor`
type tableReaderExecutorContext struct {
dctx distsqlctx.DistSQLContext
pctx planctx.PlanContext
ectx exprctx.BuildContext
getDDLOwner func(context.Context) (*infosync.ServerInfo, error)
}
func (treCtx *tableReaderExecutorContext) GetSessionVars() *variable.SessionVars {
return treCtx.dctx.GetSessionVars()
}
func (treCtx *tableReaderExecutorContext) GetInfoSchema() infoschema.InfoSchema {
return treCtx.pctx.GetInfoSchema().(infoschema.InfoSchema)
}
func (treCtx *tableReaderExecutorContext) GetDDLOwner(ctx context.Context) (*infosync.ServerInfo, error) {
if treCtx.getDDLOwner != nil {
return treCtx.getDDLOwner(ctx)
}
return nil, errors.New("GetDDLOwner in a context without DDL")
}
func newTableReaderExecutorContext(sctx sessionctx.Context) tableReaderExecutorContext {
// Explicitly get `ownerManager` out of the closure to show that the `tableReaderExecutorContext` itself doesn't
// depend on `sctx` directly.
// The context of some tests don't have `DDL`, so make it optional
var getDDLOwner func(ctx context.Context) (*infosync.ServerInfo, error)
ddl := domain.GetDomain(sctx).DDL()
if ddl != nil {
ownerManager := ddl.OwnerManager()
getDDLOwner = func(ctx context.Context) (*infosync.ServerInfo, error) {
ddlOwnerID, err := ownerManager.GetOwnerID(ctx)
if err != nil {
return nil, err
}
return infosync.GetServerInfoByID(ctx, ddlOwnerID)
}
}
return tableReaderExecutorContext{
dctx: sctx.GetDistSQLCtx(),
pctx: sctx.GetPlanCtx(),
ectx: sctx.GetExprCtx(),
getDDLOwner: getDDLOwner,
}
}
// TableReaderExecutor sends DAG request and reads table data from kv layer.
type TableReaderExecutor struct {
exec.BaseExecutor
tableReaderExecutorContext
exec.BaseExecutorV2
table table.Table
@ -169,25 +224,25 @@ func (e *TableReaderExecutor) Open(ctx context.Context) error {
} else {
e.memTracker = memory.NewTracker(e.ID(), -1)
}
e.memTracker.AttachTo(e.Ctx().GetSessionVars().StmtCtx.MemTracker)
e.memTracker.AttachTo(e.GetSessionVars().StmtCtx.MemTracker)
var err error
if e.corColInFilter {
// If there's correlated column in filter, need to rewrite dagPB
if e.storeType == kv.TiFlash {
execs, err := builder.ConstructTreeBasedDistExec(e.Ctx(), e.tablePlan)
execs, err := builder.ConstructTreeBasedDistExec(e.pctx, e.tablePlan)
if err != nil {
return err
}
e.dagPB.RootExecutor = execs[0]
} else {
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.Ctx(), e.plans)
e.dagPB.Executors, err = builder.ConstructListBasedDistExec(e.pctx, e.plans)
if err != nil {
return err
}
}
}
if e.RuntimeStats() != nil {
if e.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
collExec := true
e.dagPB.CollectExecutionSummaries = &collExec
}
@ -270,7 +325,7 @@ func (e *TableReaderExecutor) Next(ctx context.Context, req *chunk.Chunk) error
return err
}
err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.Ctx(), req)
err := table.FillVirtualColumnValue(e.virtualColumnRetFieldTypes, e.virtualColumnIndex, e.Schema().Columns, e.columns, e.ectx, req)
if err != nil {
return err
}
@ -303,7 +358,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
}
var results []distsql.SelectResult
for _, kvReq := range kvReqs {
result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return nil, err
}
@ -316,7 +371,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
if err != nil {
return nil, err
}
result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return nil, err
}
@ -331,7 +386,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
}
var results []distsql.SelectResult
for _, kvReq := range kvReqs {
result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return nil, err
}
@ -352,7 +407,7 @@ func (e *TableReaderExecutor) buildResp(ctx context.Context, ranges []*ranger.Ra
})
e.kvRanges = kvReq.KeyRanges.AppendSelfTo(e.kvRanges)
result, err := e.SelectResult(ctx, e.Ctx(), kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
result, err := e.SelectResult(ctx, e.dctx, kvReq, exec.RetTypes(e), getPhysicalPlanIDs(e.plans), e.ID())
if err != nil {
return nil, err
}
@ -379,14 +434,14 @@ func (e *TableReaderExecutor) buildKVReqSeparately(ctx context.Context, ranges [
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetFromSessionVars(e.Ctx().GetSessionVars()).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetFromSessionVars(e.GetSessionVars()).
SetFromInfoSchema(e.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetPaging(e.paging).
SetAllowBatchCop(e.batchCop).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias).
Build()
if err != nil {
return nil, err
@ -421,14 +476,14 @@ func (e *TableReaderExecutor) buildKVReqForPartitionTableScan(ctx context.Contex
SetKeepOrder(e.keepOrder).
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetFromSessionVars(e.Ctx().GetSessionVars()).
SetFromInfoSchema(e.Ctx().GetInfoSchema()).
SetFromSessionVars(e.GetSessionVars()).
SetFromInfoSchema(e.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetPaging(e.paging).
SetAllowBatchCop(e.batchCop).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilder.Request, e.netDataSize)).
SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias).
Build()
if err != nil {
return nil, err
@ -446,17 +501,12 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
}
reqBuilder = builder.SetPartitionKeyRanges(kvRange)
} else {
reqBuilder = builder.SetHandleRanges(e.Ctx().GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges)
reqBuilder = builder.SetHandleRanges(e.GetSessionVars().StmtCtx, getPhysicalTableID(e.table), e.table.Meta() != nil && e.table.Meta().IsCommonHandle, ranges)
}
if e.table != nil && e.table.Type().IsClusterTable() {
copDestination := infoschema.GetClusterTableCopDestination(e.table.Meta().Name.L)
if copDestination == infoschema.DDLOwner {
ownerManager := domain.GetDomain(e.Ctx()).DDL().OwnerManager()
ddlOwnerID, err := ownerManager.GetOwnerID(ctx)
if err != nil {
return nil, err
}
serverInfo, err := infosync.GetServerInfoByID(ctx, ddlOwnerID)
serverInfo, err := e.GetDDLOwner(ctx)
if err != nil {
return nil, err
}
@ -471,14 +521,14 @@ func (e *TableReaderExecutor) buildKVReq(ctx context.Context, ranges []*ranger.R
SetTxnScope(e.txnScope).
SetReadReplicaScope(e.readReplicaScope).
SetIsStaleness(e.isStaleness).
SetFromSessionVars(e.Ctx().GetSessionVars()).
SetFromInfoSchema(sessiontxn.GetTxnManager(e.Ctx()).GetTxnInfoSchema()).
SetFromSessionVars(e.GetSessionVars()).
SetFromInfoSchema(e.GetInfoSchema()).
SetMemTracker(e.memTracker).
SetStoreType(e.storeType).
SetAllowBatchCop(e.batchCop).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.Ctx(), &reqBuilder.Request, e.netDataSize)).
SetClosestReplicaReadAdjuster(newClosestReadAdjuster(e.GetSessionVars(), &reqBuilder.Request, e.netDataSize)).
SetPaging(e.paging).
SetConnIDAndConnAlias(e.Ctx().GetSessionVars().ConnectionID, e.Ctx().GetSessionVars().SessionAlias)
SetConnIDAndConnAlias(e.GetSessionVars().ConnectionID, e.GetSessionVars().SessionAlias)
return reqBuilder.Build()
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/pingcap/tidb/pkg/distsql"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/executor/internal/builder"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
@ -111,7 +112,7 @@ func mockDistsqlSelectCtxGet(ctx context.Context) (totalRows int, expectedRowsRe
return
}
func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request,
func mockSelectResult(ctx context.Context, dctx distsqlctx.DistSQLContext, kvReq *kv.Request,
fieldTypes []*types.FieldType, copPlanIDs []int) (distsql.SelectResult, error) {
totalRows, expectedRowsRet := mockDistsqlSelectCtxGet(ctx)
return &requiredRowsSelectResult{
@ -122,11 +123,19 @@ func mockSelectResult(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Re
}
func buildTableReader(sctx sessionctx.Context) exec.Executor {
retTypes := []*types.FieldType{types.NewFieldType(mysql.TypeDouble), types.NewFieldType(mysql.TypeLonglong)}
cols := make([]*expression.Column, len(retTypes))
for i := range retTypes {
cols[i] = &expression.Column{Index: i, RetType: retTypes[i]}
}
schema := expression.NewSchema(cols...)
e := &TableReaderExecutor{
BaseExecutor: buildMockBaseExec(sctx),
table: &tables.TableCommon{},
dagPB: buildMockDAGRequest(sctx),
selectResultHook: selectResultHook{mockSelectResult},
BaseExecutorV2: exec.NewBaseExecutorV2(sctx.GetSessionVars(), schema, 0),
tableReaderExecutorContext: newTableReaderExecutorContext(sctx),
table: &tables.TableCommon{},
dagPB: buildMockDAGRequest(sctx),
selectResultHook: selectResultHook{mockSelectResult},
}
return e
}

View File

@ -24,6 +24,7 @@ go_library(
"//pkg/ddl/placement",
"//pkg/ddl/schematracker",
"//pkg/ddl/syncer",
"//pkg/distsql/context",
"//pkg/disttask/framework/proto",
"//pkg/disttask/framework/scheduler",
"//pkg/disttask/framework/storage",

View File

@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/placement"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor"
@ -2620,6 +2621,11 @@ func (s *session) GetTableCtx() tbctx.MutateContext {
return s.tblctx
}
// GetDistSQLCtx returns the context used in DistSQL
func (s *session) GetDistSQLCtx() distsqlctx.DistSQLContext {
return s
}
func (s *session) AuthPluginForUser(user *auth.UserIdentity) (string, error) {
pm := privilege.GetPrivilegeManager(s)
authplugin, err := pm.GetAuthPluginForConnection(user.Username, user.Hostname)

View File

@ -6,6 +6,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/sessionctx",
visibility = ["//visibility:public"],
deps = [
"//pkg/distsql/context",
"//pkg/expression/context",
"//pkg/extension",
"//pkg/infoschema/context",

View File

@ -20,6 +20,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/extension"
infoschema "github.com/pingcap/tidb/pkg/infoschema/context"
@ -106,6 +107,9 @@ type Context interface {
// GetPlanCtx gets the plan context of the current session.
GetPlanCtx() planctx.PlanContext
// GetDistSQLCtx gets the distsql ctx of the current session
GetDistSQLCtx() distsqlctx.DistSQLContext
GetSessionManager() util.SessionManager
// RefreshTxnCtx commits old transaction without retry,

View File

@ -14,6 +14,7 @@ go_library(
"//pkg/errctx",
"//pkg/errno",
"//pkg/expression",
"//pkg/expression/context",
"//pkg/kv",
"//pkg/meta/autoid",
"//pkg/parser",

View File

@ -27,6 +27,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/expression"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
@ -743,7 +744,7 @@ func OptionalFsp(fieldType *types.FieldType) string {
// FillVirtualColumnValue will calculate the virtual column value by evaluating generated
// expression using rows from a chunk, and then fill this value into the chunk.
func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnIndex []int,
expCols []*expression.Column, colInfos []*model.ColumnInfo, sctx sessionctx.Context, req *chunk.Chunk) error {
expCols []*expression.Column, colInfos []*model.ColumnInfo, ectx exprctx.BuildContext, req *chunk.Chunk) error {
if len(virtualColumnIndex) == 0 {
return nil
}
@ -752,19 +753,19 @@ func FillVirtualColumnValue(virtualRetTypes []*types.FieldType, virtualColumnInd
iter := chunk.NewIterator4Chunk(req)
for i, idx := range virtualColumnIndex {
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
datum, err := expCols[idx].EvalVirtualColumn(sctx.GetExprCtx(), row)
datum, err := expCols[idx].EvalVirtualColumn(ectx, row)
if err != nil {
return err
}
// Because the expression might return different type from
// the generated column, we should wrap a CAST on the result.
castDatum, err := CastValue(sctx, datum, colInfos[idx], false, true)
castDatum, err := CastColumnValue(ectx.GetSessionVars(), datum, colInfos[idx], false, true)
if err != nil {
return err
}
// Clip to zero if get negative value after cast to unsigned.
if mysql.HasUnsignedFlag(colInfos[idx].FieldType.GetFlag()) && !castDatum.IsNull() && sctx.GetSessionVars().StmtCtx.TypeFlags().AllowNegativeToUnsigned() {
if mysql.HasUnsignedFlag(colInfos[idx].FieldType.GetFlag()) && !castDatum.IsNull() && ectx.GetSessionVars().StmtCtx.TypeFlags().AllowNegativeToUnsigned() {
switch datum.Kind() {
case types.KindInt64:
if datum.GetInt64() < 0 {

View File

@ -12,6 +12,7 @@ go_library(
importpath = "github.com/pingcap/tidb/pkg/util/mock",
visibility = ["//visibility:public"],
deps = [
"//pkg/distsql/context",
"//pkg/expression/context",
"//pkg/expression/contextimpl",
"//pkg/extension",

View File

@ -22,6 +22,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
exprctx "github.com/pingcap/tidb/pkg/expression/context"
exprctximpl "github.com/pingcap/tidb/pkg/expression/contextimpl"
"github.com/pingcap/tidb/pkg/extension"
@ -215,6 +216,11 @@ func (c *Context) GetTableCtx() tbctx.MutateContext {
return c.tblctx
}
// GetDistSQLCtx returns the distsql context of the session
func (c *Context) GetDistSQLCtx() distsqlctx.DistSQLContext {
return c
}
// Txn implements sessionctx.Context Txn interface.
func (c *Context) Txn(bool) (kv.Transaction, error) {
return &c.txn, nil