1260 lines
45 KiB
Go
1260 lines
45 KiB
Go
// Copyright 2015 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package core
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
"slices"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/kvproto/pkg/diagnosticspb"
|
|
"github.com/pingcap/tidb/pkg/config"
|
|
"github.com/pingcap/tidb/pkg/expression"
|
|
"github.com/pingcap/tidb/pkg/expression/aggregation"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/lock"
|
|
tablelock "github.com/pingcap/tidb/pkg/lock/context"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/auth"
|
|
"github.com/pingcap/tidb/pkg/parser/mysql"
|
|
"github.com/pingcap/tidb/pkg/planner/cascades"
|
|
"github.com/pingcap/tidb/pkg/planner/cascades/impl"
|
|
"github.com/pingcap/tidb/pkg/planner/core/base"
|
|
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
|
|
"github.com/pingcap/tidb/pkg/planner/core/operator/physicalop"
|
|
"github.com/pingcap/tidb/pkg/planner/core/resolve"
|
|
"github.com/pingcap/tidb/pkg/planner/core/rule"
|
|
"github.com/pingcap/tidb/pkg/planner/property"
|
|
"github.com/pingcap/tidb/pkg/planner/util/costusage"
|
|
"github.com/pingcap/tidb/pkg/privilege"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror/plannererrors"
|
|
"github.com/pingcap/tidb/pkg/util/dbutil"
|
|
utilhint "github.com/pingcap/tidb/pkg/util/hint"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/memory"
|
|
"github.com/pingcap/tidb/pkg/util/set"
|
|
"github.com/pingcap/tidb/pkg/util/size"
|
|
"github.com/pingcap/tipb/go-tipb"
|
|
"go.uber.org/atomic"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// OptimizeAstNode optimizes the query to a physical plan directly.
|
|
var OptimizeAstNode func(ctx context.Context, sctx sessionctx.Context, node *resolve.NodeW, is infoschema.InfoSchema) (base.Plan, types.NameSlice, error)
|
|
|
|
// OptimizeAstNodeNoCache bypasses the plan cache and generates a physical plan directly.
|
|
var OptimizeAstNodeNoCache func(ctx context.Context, sctx sessionctx.Context, node *resolve.NodeW, is infoschema.InfoSchema) (base.Plan, types.NameSlice, error)
|
|
|
|
// AllowCartesianProduct means whether tidb allows cartesian join without equal conditions.
|
|
var AllowCartesianProduct = atomic.NewBool(true)
|
|
|
|
const initialMaxCores uint64 = 10000
|
|
|
|
var (
|
|
// the old ref of optRuleList for downgrading to old optimizing routine.
|
|
logicalRuleList = optRuleList
|
|
// the new normalizeRuleList is special for prev-phase of memo, which is for always-good rules.
|
|
normalizeRuleList = optRuleList
|
|
// note this two list will differ when some trade-off rules is moved out of norm phase for cascades.
|
|
)
|
|
|
|
var optRuleList = []base.LogicalOptRule{
|
|
&GcSubstituter{},
|
|
&rule.ColumnPruner{},
|
|
&ResultReorder{},
|
|
&rule.BuildKeySolver{},
|
|
&DecorrelateSolver{},
|
|
&SemiJoinRewriter{},
|
|
&AggregationEliminator{},
|
|
&SkewDistinctAggRewriter{},
|
|
&ProjectionEliminator{},
|
|
&rule.MaxMinEliminator{},
|
|
&rule.ConstantPropagationSolver{},
|
|
&ConvertOuterToInnerJoin{},
|
|
&PPDSolver{},
|
|
&OuterJoinEliminator{},
|
|
&rule.PartitionProcessor{},
|
|
&rule.CollectPredicateColumnsPoint{},
|
|
&AggregationPushDownSolver{},
|
|
&DeriveTopNFromWindow{},
|
|
&rule.PredicateSimplification{},
|
|
&PushDownTopNOptimizer{},
|
|
&rule.SyncWaitStatsLoadPoint{},
|
|
&JoinReOrderSolver{},
|
|
&rule.ColumnPruner{}, // column pruning again at last, note it will mess up the results of buildKeySolver
|
|
&PushDownSequenceSolver{},
|
|
&EliminateUnionAllDualItem{},
|
|
&EmptySelectionEliminator{},
|
|
&ResolveExpand{},
|
|
}
|
|
|
|
// Interaction Rule List
|
|
/* The interaction rule will be trigger when it satisfies following conditions:
|
|
1. The related rule has been trigger and changed the plan
|
|
2. The interaction rule is enabled
|
|
*/
|
|
var optInteractionRuleList = map[base.LogicalOptRule]base.LogicalOptRule{}
|
|
|
|
// BuildLogicalPlanForTest builds a logical plan for testing purpose from ast.Node.
|
|
func BuildLogicalPlanForTest(ctx context.Context, sctx sessionctx.Context, node *resolve.NodeW, infoSchema infoschema.InfoSchema) (base.Plan, error) {
|
|
sctx.GetSessionVars().PlanID.Store(0)
|
|
sctx.GetSessionVars().PlanColumnID.Store(0)
|
|
builder, _ := NewPlanBuilder().Init(sctx.GetPlanCtx(), infoSchema, utilhint.NewQBHintHandler(nil))
|
|
p, err := builder.Build(ctx, node)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if logic, ok := p.(base.LogicalPlan); ok {
|
|
RecheckCTE(logic)
|
|
}
|
|
return p, err
|
|
}
|
|
|
|
// CheckPrivilege checks the privilege for a user.
|
|
func CheckPrivilege(activeRoles []*auth.RoleIdentity, pm privilege.Manager, vs []visitInfo) error {
|
|
for _, v := range vs {
|
|
if v.privilege == mysql.ExtendedPriv {
|
|
hasPriv := false
|
|
for _, priv := range v.dynamicPrivs {
|
|
hasPriv = hasPriv || pm.RequestDynamicVerification(activeRoles, priv, v.dynamicWithGrant)
|
|
if hasPriv {
|
|
break
|
|
}
|
|
}
|
|
if !hasPriv {
|
|
if v.err == nil {
|
|
return plannererrors.ErrPrivilegeCheckFail.GenWithStackByArgs(v.dynamicPrivs)
|
|
}
|
|
return v.err
|
|
}
|
|
} else if !pm.RequestVerification(activeRoles, v.db, v.table, v.column, v.privilege) {
|
|
if v.err == nil {
|
|
return plannererrors.ErrPrivilegeCheckFail.GenWithStackByArgs(v.privilege.String())
|
|
}
|
|
return v.err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// VisitInfo4PrivCheck generates privilege check infos because privilege check of local temporary tables is different
|
|
// with normal tables. `CREATE` statement needs `CREATE TEMPORARY TABLE` privilege from the database, and subsequent
|
|
// statements do not need any privileges.
|
|
func VisitInfo4PrivCheck(ctx context.Context, is infoschema.InfoSchema, node ast.Node, vs []visitInfo) (privVisitInfo []visitInfo) {
|
|
if node == nil {
|
|
return vs
|
|
}
|
|
|
|
switch stmt := node.(type) {
|
|
case *ast.CreateTableStmt:
|
|
privVisitInfo = make([]visitInfo, 0, len(vs))
|
|
for _, v := range vs {
|
|
if v.privilege == mysql.CreatePriv {
|
|
if stmt.TemporaryKeyword == ast.TemporaryLocal {
|
|
// `CREATE TEMPORARY TABLE` privilege is required from the database, not the table.
|
|
newVisitInfo := v
|
|
newVisitInfo.privilege = mysql.CreateTMPTablePriv
|
|
newVisitInfo.table = ""
|
|
privVisitInfo = append(privVisitInfo, newVisitInfo)
|
|
} else {
|
|
// If both the normal table and temporary table already exist, we need to check the privilege.
|
|
privVisitInfo = append(privVisitInfo, v)
|
|
}
|
|
} else {
|
|
// `CREATE TABLE LIKE tmp` or `CREATE TABLE FROM SELECT tmp` in the future.
|
|
if needCheckTmpTablePriv(ctx, is, v) {
|
|
privVisitInfo = append(privVisitInfo, v)
|
|
}
|
|
}
|
|
}
|
|
case *ast.DropTableStmt:
|
|
// Dropping a local temporary table doesn't need any privileges.
|
|
if stmt.IsView {
|
|
privVisitInfo = vs
|
|
} else {
|
|
privVisitInfo = make([]visitInfo, 0, len(vs))
|
|
if stmt.TemporaryKeyword != ast.TemporaryLocal {
|
|
for _, v := range vs {
|
|
if needCheckTmpTablePriv(ctx, is, v) {
|
|
privVisitInfo = append(privVisitInfo, v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
case *ast.GrantStmt, *ast.DropSequenceStmt, *ast.DropPlacementPolicyStmt:
|
|
// Some statements ignore local temporary tables, so they should check the privileges on normal tables.
|
|
privVisitInfo = vs
|
|
default:
|
|
privVisitInfo = make([]visitInfo, 0, len(vs))
|
|
for _, v := range vs {
|
|
if needCheckTmpTablePriv(ctx, is, v) {
|
|
privVisitInfo = append(privVisitInfo, v)
|
|
}
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func needCheckTmpTablePriv(ctx context.Context, is infoschema.InfoSchema, v visitInfo) bool {
|
|
if v.db != "" && v.table != "" {
|
|
// Other statements on local temporary tables except `CREATE` do not check any privileges.
|
|
tb, err := is.TableByName(ctx, ast.NewCIStr(v.db), ast.NewCIStr(v.table))
|
|
// If the table doesn't exist, we do not report errors to avoid leaking the existence of the table.
|
|
if err == nil && tb.Meta().TempTableType == model.TempTableLocal {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
// CheckTableLock checks the table lock.
|
|
func CheckTableLock(ctx tablelock.TableLockReadContext, is infoschema.InfoSchema, vs []visitInfo) error {
|
|
if !config.TableLockEnabled() {
|
|
return nil
|
|
}
|
|
|
|
checker := lock.NewChecker(ctx, is)
|
|
for i := range vs {
|
|
err := checker.CheckTableLock(vs[i].db, vs[i].table, vs[i].privilege, vs[i].alterWritable)
|
|
// if table with lock-write table dropped, we can access other table, such as `rename` operation
|
|
if err == lock.ErrLockedTableDropped {
|
|
break
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CheckTableMode checks if the table is accessible by table mode, only TableModeNormal can be accessed.
|
|
func CheckTableMode(node *resolve.NodeW) error {
|
|
// First make exceptions for stmt that only visit table meta;
|
|
// For example, `describe <table_name>` and `show create table <table_name>`;
|
|
switch node.Node.(type) {
|
|
case *ast.ShowStmt, *ast.ExplainStmt:
|
|
default:
|
|
// Special handling to the `ADMIN CHECKSUM TABLE`, as `IMPORT INTO` will
|
|
// executes this statement during post checksum to verify data.
|
|
// TODO: only allow `ADMIN CHECKSUM TABLE` from import into task
|
|
adminStmt, ok := node.Node.(*ast.AdminStmt)
|
|
if ok && adminStmt.Tp == ast.AdminChecksumTable {
|
|
return nil
|
|
}
|
|
for _, tblNameW := range node.GetResolveContext().GetTableNames() {
|
|
if err := dbutil.CheckTableModeIsNormal(tblNameW.Name, tblNameW.TableInfo.Mode); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func checkStableResultMode(sctx base.PlanContext) bool {
|
|
s := sctx.GetSessionVars()
|
|
st := s.StmtCtx
|
|
return s.EnableStableResultMode && (!st.InInsertStmt && !st.InUpdateStmt && !st.InDeleteStmt && !st.InLoadDataStmt)
|
|
}
|
|
|
|
// doOptimize optimizes a logical plan into a physical plan,
|
|
// while also returning the optimized logical plan, the final physical plan, and the cost of the final plan.
|
|
// The returned logical plan is necessary for generating plans for Common Table Expressions (CTEs).
|
|
func doOptimize(ctx context.Context, sctx base.PlanContext, flag uint64, logic base.LogicalPlan) (
|
|
base.LogicalPlan, base.PhysicalPlan, float64, error) {
|
|
if sctx.GetSessionVars().GetSessionVars().EnableCascadesPlanner {
|
|
return CascadesOptimize(ctx, sctx, flag, logic)
|
|
}
|
|
return VolcanoOptimize(ctx, sctx, flag, logic)
|
|
}
|
|
|
|
// CascadesOptimize includes: normalization, cascadesOptimize, and physicalOptimize.
|
|
func CascadesOptimize(ctx context.Context, sctx base.PlanContext, flag uint64, logic base.LogicalPlan) (base.LogicalPlan, base.PhysicalPlan, float64, error) {
|
|
flag = adjustOptimizationFlags(flag, logic)
|
|
logic, err := normalizeOptimize(ctx, flag, logic)
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) {
|
|
return nil, nil, 0, errors.Trace(plannererrors.ErrCartesianProductUnsupported)
|
|
}
|
|
logic.ExtractFD()
|
|
|
|
var cas *cascades.Optimizer
|
|
if cas, err = cascades.NewOptimizer(logic); err == nil {
|
|
defer cas.Destroy()
|
|
err = cas.Execute()
|
|
}
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
var (
|
|
cost float64
|
|
physical base.PhysicalPlan
|
|
)
|
|
physical, cost, err = impl.ImplementMemoAndCost(cas.GetMemo().GetRootGroup())
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
|
|
finalPlan := postOptimize(ctx, sctx, physical)
|
|
return logic, finalPlan, cost, nil
|
|
}
|
|
|
|
// VolcanoOptimize includes: logicalOptimize, physicalOptimize
|
|
func VolcanoOptimize(ctx context.Context, sctx base.PlanContext, flag uint64, logic base.LogicalPlan) (base.LogicalPlan, base.PhysicalPlan, float64, error) {
|
|
flag = adjustOptimizationFlags(flag, logic)
|
|
logic, err := logicalOptimize(ctx, flag, logic)
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
if !AllowCartesianProduct.Load() && existsCartesianProduct(logic) {
|
|
return nil, nil, 0, errors.Trace(plannererrors.ErrCartesianProductUnsupported)
|
|
}
|
|
failpoint.Inject("ConsumeVolcanoOptimizePanic", nil)
|
|
physical, cost, err := physicalOptimize(logic)
|
|
if err != nil {
|
|
return nil, nil, 0, err
|
|
}
|
|
finalPlan := postOptimize(ctx, sctx, physical)
|
|
return logic, finalPlan, cost, nil
|
|
}
|
|
|
|
func adjustOptimizationFlags(flag uint64, logic base.LogicalPlan) uint64 {
|
|
// If there is something after flagPrunColumns, do FlagPruneColumnsAgain.
|
|
if flag&rule.FlagPruneColumns > 0 && flag-rule.FlagPruneColumns > rule.FlagPruneColumns {
|
|
flag |= rule.FlagPruneColumnsAgain
|
|
}
|
|
if checkStableResultMode(logic.SCtx()) {
|
|
flag |= rule.FlagStabilizeResults
|
|
}
|
|
if logic.SCtx().GetSessionVars().StmtCtx.StraightJoinOrder {
|
|
// When we use the straight Join Order hint, we should disable the join reorder optimization.
|
|
flag &= ^rule.FlagJoinReOrder
|
|
}
|
|
// InternalSQLScanUserTable is for ttl scan.
|
|
if !logic.SCtx().GetSessionVars().InRestrictedSQL || logic.SCtx().GetSessionVars().InternalSQLScanUserTable {
|
|
flag |= rule.FlagCollectPredicateColumnsPoint
|
|
flag |= rule.FlagSyncWaitStatsLoadPoint
|
|
}
|
|
if !logic.SCtx().GetSessionVars().StmtCtx.UseDynamicPruneMode {
|
|
flag |= rule.FlagPartitionProcessor // apply partition pruning under static mode
|
|
}
|
|
return flag
|
|
}
|
|
|
|
// DoOptimize optimizes a logical plan to a physical plan.
|
|
func DoOptimize(
|
|
ctx context.Context,
|
|
sctx base.PlanContext,
|
|
flag uint64,
|
|
logic base.LogicalPlan,
|
|
) (base.PhysicalPlan, float64, error) {
|
|
_, finalPlan, cost, err := doOptimize(ctx, sctx, flag, logic)
|
|
return finalPlan, cost, err
|
|
}
|
|
|
|
// mergeContinuousSelections merge continuous selections which may occur after changing plans.
|
|
func mergeContinuousSelections(p base.PhysicalPlan) {
|
|
if sel, ok := p.(*physicalop.PhysicalSelection); ok {
|
|
for {
|
|
childSel := sel.Children()[0]
|
|
tmp, ok := childSel.(*physicalop.PhysicalSelection)
|
|
if !ok {
|
|
break
|
|
}
|
|
sel.Conditions = append(sel.Conditions, tmp.Conditions...)
|
|
sel.SetChild(0, tmp.Children()[0])
|
|
}
|
|
}
|
|
for _, child := range p.Children() {
|
|
mergeContinuousSelections(child)
|
|
}
|
|
// merge continuous selections in a coprocessor task of tiflash
|
|
tableReader, isTableReader := p.(*physicalop.PhysicalTableReader)
|
|
if isTableReader && tableReader.StoreType == kv.TiFlash {
|
|
mergeContinuousSelections(tableReader.TablePlan)
|
|
tableReader.TablePlans = physicalop.FlattenListPushDownPlan(tableReader.TablePlan)
|
|
}
|
|
}
|
|
|
|
func postOptimize(ctx context.Context, sctx base.PlanContext, plan base.PhysicalPlan) base.PhysicalPlan {
|
|
// some cases from update optimize will require avoiding projection elimination.
|
|
// see comments ahead of call of DoOptimize in function of buildUpdate().
|
|
plan = eliminatePhysicalProjection(plan)
|
|
plan = InjectExtraProjection(plan)
|
|
mergeContinuousSelections(plan)
|
|
plan = eliminateUnionScanAndLock(sctx, plan)
|
|
plan = avoidColumnEvaluatorForProjBelowUnion(plan)
|
|
plan = enableParallelApply(sctx, plan)
|
|
handleFineGrainedShuffle(ctx, sctx, plan)
|
|
propagateProbeParents(plan, nil)
|
|
countStarRewrite(plan)
|
|
disableReuseChunkIfNeeded(sctx, plan)
|
|
generateRuntimeFilter(sctx, plan)
|
|
return plan
|
|
}
|
|
|
|
func generateRuntimeFilter(sctx base.PlanContext, plan base.PhysicalPlan) {
|
|
if !sctx.GetSessionVars().IsRuntimeFilterEnabled() || sctx.GetSessionVars().InRestrictedSQL {
|
|
return
|
|
}
|
|
logutil.BgLogger().Debug("Start runtime filter generator")
|
|
rfGenerator := &RuntimeFilterGenerator{
|
|
rfIDGenerator: &util.IDGenerator{},
|
|
columnUniqueIDToRF: map[int64][]*physicalop.RuntimeFilter{},
|
|
parentPhysicalPlan: plan,
|
|
}
|
|
startRFGenerator := time.Now()
|
|
rfGenerator.GenerateRuntimeFilter(plan)
|
|
logutil.BgLogger().Debug("Finish runtime filter generator",
|
|
zap.Duration("Cost", time.Since(startRFGenerator)))
|
|
}
|
|
|
|
/*
|
|
*
|
|
The countStarRewriter is used to rewrite
|
|
|
|
count(*) -> count(not null column)
|
|
|
|
**Only for TiFlash**
|
|
Attention:
|
|
Since count(*) is directly translated into count(1) during grammar parsing,
|
|
the rewritten pattern actually matches count(constant)
|
|
|
|
Pattern:
|
|
PhysicalAggregation: count(constant)
|
|
|
|
|
|
|
TableFullScan: TiFlash
|
|
|
|
Optimize:
|
|
Table
|
|
|
|
<k1 bool not null, k2 int null, k3 bigint not null>
|
|
|
|
Query: select count(*) from table
|
|
ColumnPruningRule: datasource pick row_id
|
|
countStarRewrite: datasource pick k1 instead of row_id
|
|
|
|
rewrite count(*) -> count(k1)
|
|
|
|
Rewritten Query: select count(k1) from table
|
|
*/
|
|
func countStarRewrite(plan base.PhysicalPlan) {
|
|
countStarRewriteInternal(plan)
|
|
if tableReader, ok := plan.(*physicalop.PhysicalTableReader); ok {
|
|
countStarRewrite(tableReader.TablePlan)
|
|
} else {
|
|
for _, child := range plan.Children() {
|
|
countStarRewrite(child)
|
|
}
|
|
}
|
|
}
|
|
|
|
func countStarRewriteInternal(plan base.PhysicalPlan) {
|
|
// match pattern any agg(count(constant)) -> tablefullscan(tiflash)
|
|
var physicalAgg *physicalop.BasePhysicalAgg
|
|
switch x := plan.(type) {
|
|
case *physicalop.PhysicalHashAgg:
|
|
physicalAgg = x.GetPointer()
|
|
case *physicalop.PhysicalStreamAgg:
|
|
physicalAgg = x.GetPointer()
|
|
default:
|
|
return
|
|
}
|
|
if len(physicalAgg.GroupByItems) > 0 || len(physicalAgg.Children()) != 1 {
|
|
return
|
|
}
|
|
for _, aggFunc := range physicalAgg.AggFuncs {
|
|
if aggFunc.Name != "count" || len(aggFunc.Args) != 1 || aggFunc.HasDistinct {
|
|
return
|
|
}
|
|
if _, ok := aggFunc.Args[0].(*expression.Constant); !ok {
|
|
return
|
|
}
|
|
}
|
|
physicalTableScan, ok := physicalAgg.Children()[0].(*physicalop.PhysicalTableScan)
|
|
if !ok || !physicalTableScan.IsFullScan() || physicalTableScan.StoreType != kv.TiFlash || len(physicalTableScan.Schema().Columns) != 1 {
|
|
return
|
|
}
|
|
// rewrite datasource and agg args
|
|
rewriteTableScanAndAggArgs(physicalTableScan, physicalAgg.AggFuncs)
|
|
}
|
|
|
|
// rewriteTableScanAndAggArgs Pick the narrowest and not null column from table
|
|
// If there is no not null column in Data Source, the row_id or pk column will be retained
|
|
func rewriteTableScanAndAggArgs(physicalTableScan *physicalop.PhysicalTableScan, aggFuncs []*aggregation.AggFuncDesc) {
|
|
var resultColumnInfo *model.ColumnInfo
|
|
var resultColumn *expression.Column
|
|
|
|
resultColumnInfo = physicalTableScan.Columns[0]
|
|
resultColumn = physicalTableScan.Schema().Columns[0]
|
|
// prefer not null column from table
|
|
for _, columnInfo := range physicalTableScan.Table.Columns {
|
|
if columnInfo.FieldType.IsVarLengthType() {
|
|
continue
|
|
}
|
|
if mysql.HasNotNullFlag(columnInfo.GetFlag()) {
|
|
if columnInfo.GetFlen() < resultColumnInfo.GetFlen() {
|
|
resultColumnInfo = columnInfo
|
|
resultColumn = &expression.Column{
|
|
UniqueID: physicalTableScan.SCtx().GetSessionVars().AllocPlanColumnID(),
|
|
ID: resultColumnInfo.ID,
|
|
RetType: resultColumnInfo.FieldType.Clone(),
|
|
OrigName: fmt.Sprintf("%s.%s.%s", physicalTableScan.DBName.L, physicalTableScan.Table.Name.L, resultColumnInfo.Name),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// table scan (row_id) -> (not null column)
|
|
physicalTableScan.Columns[0] = resultColumnInfo
|
|
physicalTableScan.Schema().Columns[0] = resultColumn
|
|
// agg arg count(1) -> count(not null column)
|
|
arg := resultColumn.Clone()
|
|
for _, aggFunc := range aggFuncs {
|
|
constExpr, ok := aggFunc.Args[0].(*expression.Constant)
|
|
if !ok {
|
|
return
|
|
}
|
|
// count(null) shouldn't be rewritten
|
|
if constExpr.Value.IsNull() {
|
|
continue
|
|
}
|
|
aggFunc.Args[0] = arg
|
|
}
|
|
}
|
|
|
|
// Only for MPP(Window<-[Sort]<-ExchangeReceiver<-ExchangeSender).
|
|
// TiFlashFineGrainedShuffleStreamCount:
|
|
// < 0: fine grained shuffle is disabled.
|
|
// > 0: use TiFlashFineGrainedShuffleStreamCount as stream count.
|
|
// == 0: use TiFlashMaxThreads as stream count when it's greater than 0. Otherwise set status as uninitialized.
|
|
func handleFineGrainedShuffle(ctx context.Context, sctx base.PlanContext, plan base.PhysicalPlan) {
|
|
streamCount := sctx.GetSessionVars().TiFlashFineGrainedShuffleStreamCount
|
|
if streamCount < 0 {
|
|
return
|
|
}
|
|
if streamCount == 0 {
|
|
if sctx.GetSessionVars().TiFlashMaxThreads > 0 {
|
|
streamCount = sctx.GetSessionVars().TiFlashMaxThreads
|
|
}
|
|
}
|
|
// use two separate cluster info to avoid grpc calls cost
|
|
tiflashServerCountInfo := tiflashClusterInfo{unInitialized, 0}
|
|
streamCountInfo := tiflashClusterInfo{unInitialized, 0}
|
|
if streamCount != 0 {
|
|
streamCountInfo.itemStatus = initialized
|
|
streamCountInfo.itemValue = uint64(streamCount)
|
|
}
|
|
setupFineGrainedShuffle(ctx, sctx, &streamCountInfo, &tiflashServerCountInfo, plan)
|
|
}
|
|
|
|
func setupFineGrainedShuffle(ctx context.Context, sctx base.PlanContext, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, plan base.PhysicalPlan) {
|
|
if tableReader, ok := plan.(*physicalop.PhysicalTableReader); ok {
|
|
if _, isExchangeSender := tableReader.TablePlan.(*physicalop.PhysicalExchangeSender); isExchangeSender {
|
|
helper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: make([]*physicalop.BasePhysicalPlan, 1)}
|
|
setupFineGrainedShuffleInternal(ctx, sctx, tableReader.TablePlan, &helper, streamCountInfo, tiflashServerCountInfo)
|
|
}
|
|
} else {
|
|
for _, child := range plan.Children() {
|
|
setupFineGrainedShuffle(ctx, sctx, streamCountInfo, tiflashServerCountInfo, child)
|
|
}
|
|
}
|
|
}
|
|
|
|
type shuffleTarget uint8
|
|
|
|
const (
|
|
unknown shuffleTarget = iota
|
|
window
|
|
joinBuild
|
|
hashAgg
|
|
)
|
|
|
|
type fineGrainedShuffleHelper struct {
|
|
shuffleTarget shuffleTarget
|
|
plans []*physicalop.BasePhysicalPlan
|
|
joinKeys []*expression.Column
|
|
}
|
|
|
|
type tiflashClusterInfoStatus uint8
|
|
|
|
const (
|
|
unInitialized tiflashClusterInfoStatus = iota
|
|
initialized
|
|
failed
|
|
)
|
|
|
|
type tiflashClusterInfo struct {
|
|
itemStatus tiflashClusterInfoStatus
|
|
itemValue uint64
|
|
}
|
|
|
|
func (h *fineGrainedShuffleHelper) clear() {
|
|
h.shuffleTarget = unknown
|
|
h.plans = h.plans[:0]
|
|
h.joinKeys = nil
|
|
}
|
|
|
|
func (h *fineGrainedShuffleHelper) updateTarget(t shuffleTarget, p *physicalop.BasePhysicalPlan) {
|
|
h.shuffleTarget = t
|
|
h.plans = append(h.plans, p)
|
|
}
|
|
|
|
func getTiFlashServerMinLogicalCores(ctx context.Context, sctx base.PlanContext, serversInfo []infoschema.ServerInfo) (bool, uint64) {
|
|
failpoint.Inject("mockTiFlashStreamCountUsingMinLogicalCores", func(val failpoint.Value) {
|
|
intVal, err := strconv.Atoi(val.(string))
|
|
if err == nil {
|
|
failpoint.Return(true, uint64(intVal))
|
|
} else {
|
|
failpoint.Return(false, 0)
|
|
}
|
|
})
|
|
defer func(begin time.Time) {
|
|
// if there are any network jitters, this could take a long time.
|
|
sctx.GetSessionVars().DurationOptimizer.TiFlashInfoFetch = time.Since(begin)
|
|
}(time.Now())
|
|
rows, err := infoschema.FetchClusterServerInfoWithoutPrivilegeCheck(ctx, sctx.GetSessionVars(), serversInfo, diagnosticspb.ServerInfoType_HardwareInfo, false)
|
|
if err != nil {
|
|
return false, 0
|
|
}
|
|
var minLogicalCores = initialMaxCores // set to a large enough value here
|
|
for _, row := range rows {
|
|
if row[4].GetString() == "cpu-logical-cores" {
|
|
logicalCpus, err := strconv.Atoi(row[5].GetString())
|
|
if err == nil && logicalCpus > 0 {
|
|
minLogicalCores = min(minLogicalCores, uint64(logicalCpus))
|
|
}
|
|
}
|
|
}
|
|
// No need to check len(serersInfo) == serverCount here, since missing some servers' info won't affect the correctness
|
|
return true, minLogicalCores
|
|
}
|
|
|
|
// calculateTiFlashStreamCountUsingMinLogicalCores uses minimal logical cpu cores among tiflash servers
|
|
// return false, 0 if any err happens
|
|
func calculateTiFlashStreamCountUsingMinLogicalCores(ctx context.Context, sctx base.PlanContext, serversInfo []infoschema.ServerInfo) (bool, uint64) {
|
|
valid, minLogicalCores := getTiFlashServerMinLogicalCores(ctx, sctx, serversInfo)
|
|
if !valid {
|
|
return false, 0
|
|
}
|
|
if minLogicalCores != initialMaxCores {
|
|
// use logical core number as the stream count, the same as TiFlash's default max_threads: https://github.com/pingcap/tiflash/blob/v7.5.0/dbms/src/Interpreters/SettingsCommon.h#L166
|
|
return true, minLogicalCores
|
|
}
|
|
|
|
return false, 0
|
|
}
|
|
|
|
func checkFineGrainedShuffleForJoinAgg(ctx context.Context, sctx base.PlanContext, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo, exchangeColCount int, splitLimit uint64) (applyFlag bool, streamCount uint64) {
|
|
switch (*streamCountInfo).itemStatus {
|
|
case unInitialized:
|
|
streamCount = 4 // assume 8c node in cluster as minimal, stream count is 8 / 2 = 4
|
|
case initialized:
|
|
streamCount = (*streamCountInfo).itemValue
|
|
case failed:
|
|
return false, 0 // probably won't reach this path
|
|
}
|
|
|
|
var tiflashServerCount uint64
|
|
switch (*tiflashServerCountInfo).itemStatus {
|
|
case unInitialized:
|
|
serversInfo, err := infoschema.GetTiFlashServerInfo(sctx.GetStore())
|
|
if err != nil {
|
|
(*tiflashServerCountInfo).itemStatus = failed
|
|
(*tiflashServerCountInfo).itemValue = 0
|
|
if (*streamCountInfo).itemStatus == unInitialized {
|
|
setDefaultStreamCount(streamCountInfo)
|
|
}
|
|
return false, 0
|
|
}
|
|
tiflashServerCount = uint64(len(serversInfo))
|
|
(*tiflashServerCountInfo).itemStatus = initialized
|
|
(*tiflashServerCountInfo).itemValue = tiflashServerCount
|
|
case initialized:
|
|
tiflashServerCount = (*tiflashServerCountInfo).itemValue
|
|
case failed:
|
|
return false, 0
|
|
}
|
|
|
|
// if already exceeds splitLimit, no need to fetch actual logical cores
|
|
if tiflashServerCount*uint64(exchangeColCount)*streamCount > splitLimit {
|
|
return false, 0
|
|
}
|
|
|
|
// if streamCount already initialized, and can pass splitLimit check
|
|
if (*streamCountInfo).itemStatus == initialized {
|
|
return true, streamCount
|
|
}
|
|
|
|
serversInfo, err := infoschema.GetTiFlashServerInfo(sctx.GetStore())
|
|
if err != nil {
|
|
(*tiflashServerCountInfo).itemStatus = failed
|
|
(*tiflashServerCountInfo).itemValue = 0
|
|
return false, 0
|
|
}
|
|
flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo)
|
|
if !flag {
|
|
setDefaultStreamCount(streamCountInfo)
|
|
(*tiflashServerCountInfo).itemStatus = failed
|
|
return false, 0
|
|
}
|
|
streamCount = temStreamCount
|
|
(*streamCountInfo).itemStatus = initialized
|
|
(*streamCountInfo).itemValue = streamCount
|
|
applyFlag = tiflashServerCount*uint64(exchangeColCount)*streamCount <= splitLimit
|
|
return applyFlag, streamCount
|
|
}
|
|
|
|
func inferFineGrainedShuffleStreamCountForWindow(ctx context.Context, sctx base.PlanContext, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) (streamCount uint64) {
|
|
switch (*streamCountInfo).itemStatus {
|
|
case unInitialized:
|
|
if (*tiflashServerCountInfo).itemStatus == failed {
|
|
setDefaultStreamCount(streamCountInfo)
|
|
streamCount = (*streamCountInfo).itemValue
|
|
break
|
|
}
|
|
|
|
serversInfo, err := infoschema.GetTiFlashServerInfo(sctx.GetStore())
|
|
if err != nil {
|
|
setDefaultStreamCount(streamCountInfo)
|
|
streamCount = (*streamCountInfo).itemValue
|
|
(*tiflashServerCountInfo).itemStatus = failed
|
|
break
|
|
}
|
|
|
|
if (*tiflashServerCountInfo).itemStatus == unInitialized {
|
|
(*tiflashServerCountInfo).itemStatus = initialized
|
|
(*tiflashServerCountInfo).itemValue = uint64(len(serversInfo))
|
|
}
|
|
|
|
flag, temStreamCount := calculateTiFlashStreamCountUsingMinLogicalCores(ctx, sctx, serversInfo)
|
|
if !flag {
|
|
setDefaultStreamCount(streamCountInfo)
|
|
streamCount = (*streamCountInfo).itemValue
|
|
(*tiflashServerCountInfo).itemStatus = failed
|
|
break
|
|
}
|
|
streamCount = temStreamCount
|
|
(*streamCountInfo).itemStatus = initialized
|
|
(*streamCountInfo).itemValue = streamCount
|
|
case initialized:
|
|
streamCount = (*streamCountInfo).itemValue
|
|
case failed:
|
|
setDefaultStreamCount(streamCountInfo)
|
|
streamCount = (*streamCountInfo).itemValue
|
|
}
|
|
return streamCount
|
|
}
|
|
|
|
func setDefaultStreamCount(streamCountInfo *tiflashClusterInfo) {
|
|
(*streamCountInfo).itemStatus = initialized
|
|
(*streamCountInfo).itemValue = vardef.DefStreamCountWhenMaxThreadsNotSet
|
|
}
|
|
|
|
func setupFineGrainedShuffleInternal(ctx context.Context, sctx base.PlanContext, plan base.PhysicalPlan, helper *fineGrainedShuffleHelper, streamCountInfo *tiflashClusterInfo, tiflashServerCountInfo *tiflashClusterInfo) {
|
|
switch x := plan.(type) {
|
|
case *physicalop.PhysicalWindow:
|
|
// Do not clear the plans because window executor will keep the data partition.
|
|
// For non hash partition window function, there will be a passthrough ExchangeSender to collect data,
|
|
// which will break data partition.
|
|
helper.updateTarget(window, &x.BasePhysicalPlan)
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], helper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalSort:
|
|
if x.IsPartialSort {
|
|
// Partial sort will keep the data partition.
|
|
helper.plans = append(helper.plans, &x.BasePhysicalPlan)
|
|
} else {
|
|
// Global sort will break the data partition.
|
|
helper.clear()
|
|
}
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], helper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalSelection:
|
|
helper.plans = append(helper.plans, &x.BasePhysicalPlan)
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], helper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalProjection:
|
|
helper.plans = append(helper.plans, &x.BasePhysicalPlan)
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], helper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalExchangeReceiver:
|
|
helper.plans = append(helper.plans, &x.BasePhysicalPlan)
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], helper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalHashAgg:
|
|
// Todo: allow hash aggregation's output still benefits from fine grained shuffle
|
|
aggHelper := fineGrainedShuffleHelper{shuffleTarget: hashAgg, plans: []*physicalop.BasePhysicalPlan{}}
|
|
aggHelper.plans = append(aggHelper.plans, &x.BasePhysicalPlan)
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], &aggHelper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalHashJoin:
|
|
child0 := x.Children()[0]
|
|
child1 := x.Children()[1]
|
|
buildChild := child0
|
|
probChild := child1
|
|
joinKeys := x.LeftJoinKeys
|
|
if x.InnerChildIdx != 0 {
|
|
// Child1 is build side.
|
|
buildChild = child1
|
|
joinKeys = x.RightJoinKeys
|
|
probChild = child0
|
|
}
|
|
|
|
if len(joinKeys) > 0 && !x.CanTiFlashUseHashJoinV2(sctx) { // Not cross join and can not use hash join v2 in tiflash
|
|
buildHelper := fineGrainedShuffleHelper{shuffleTarget: joinBuild, plans: []*physicalop.BasePhysicalPlan{}}
|
|
buildHelper.plans = append(buildHelper.plans, &x.BasePhysicalPlan)
|
|
buildHelper.joinKeys = joinKeys
|
|
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
|
|
} else {
|
|
buildHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*physicalop.BasePhysicalPlan{}}
|
|
setupFineGrainedShuffleInternal(ctx, sctx, buildChild, &buildHelper, streamCountInfo, tiflashServerCountInfo)
|
|
}
|
|
// don't apply fine grained shuffle for probe side
|
|
helper.clear()
|
|
setupFineGrainedShuffleInternal(ctx, sctx, probChild, helper, streamCountInfo, tiflashServerCountInfo)
|
|
case *physicalop.PhysicalExchangeSender:
|
|
if x.ExchangeType == tipb.ExchangeType_Hash {
|
|
// Set up stream count for all plans based on shuffle target type.
|
|
var exchangeColCount = x.Schema().Len()
|
|
switch helper.shuffleTarget {
|
|
case window:
|
|
streamCount := inferFineGrainedShuffleStreamCountForWindow(ctx, sctx, streamCountInfo, tiflashServerCountInfo)
|
|
x.TiFlashFineGrainedShuffleStreamCount = streamCount
|
|
for _, p := range helper.plans {
|
|
p.TiFlashFineGrainedShuffleStreamCount = streamCount
|
|
}
|
|
case hashAgg:
|
|
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 1200) // 1200: performance test result
|
|
if applyFlag {
|
|
x.TiFlashFineGrainedShuffleStreamCount = streamCount
|
|
for _, p := range helper.plans {
|
|
p.TiFlashFineGrainedShuffleStreamCount = streamCount
|
|
}
|
|
}
|
|
case joinBuild:
|
|
// Support hashJoin only when shuffle hash keys equals to join keys due to tiflash implementations
|
|
if len(x.HashCols) != len(helper.joinKeys) {
|
|
break
|
|
}
|
|
// Check the shuffle key should be equal to joinKey, otherwise the shuffle hash code may not be equal to
|
|
// actual join hash code due to type cast
|
|
applyFlag := true
|
|
for i, joinKey := range helper.joinKeys {
|
|
if !x.HashCols[i].Col.EqualColumn(joinKey) {
|
|
applyFlag = false
|
|
break
|
|
}
|
|
}
|
|
if !applyFlag {
|
|
break
|
|
}
|
|
applyFlag, streamCount := checkFineGrainedShuffleForJoinAgg(ctx, sctx, streamCountInfo, tiflashServerCountInfo, exchangeColCount, 600) // 600: performance test result
|
|
if applyFlag {
|
|
x.TiFlashFineGrainedShuffleStreamCount = streamCount
|
|
for _, p := range helper.plans {
|
|
p.TiFlashFineGrainedShuffleStreamCount = streamCount
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// exchange sender will break the data partition.
|
|
helper.clear()
|
|
setupFineGrainedShuffleInternal(ctx, sctx, x.Children()[0], helper, streamCountInfo, tiflashServerCountInfo)
|
|
default:
|
|
for _, child := range x.Children() {
|
|
childHelper := fineGrainedShuffleHelper{shuffleTarget: unknown, plans: []*physicalop.BasePhysicalPlan{}}
|
|
setupFineGrainedShuffleInternal(ctx, sctx, child, &childHelper, streamCountInfo, tiflashServerCountInfo)
|
|
}
|
|
}
|
|
}
|
|
|
|
// propagateProbeParents doesn't affect the execution plan, it only sets the probeParents field of a PhysicalPlan.
|
|
// It's for handling the inconsistency between row count in the statsInfo and the recorded actual row count. Please
|
|
// see comments in PhysicalPlan for details.
|
|
func propagateProbeParents(plan base.PhysicalPlan, probeParents []base.PhysicalPlan) {
|
|
plan.SetProbeParents(probeParents)
|
|
switch x := plan.(type) {
|
|
case *physicalop.PhysicalApply, *physicalop.PhysicalIndexJoin, *physicalop.PhysicalIndexHashJoin,
|
|
*physicalop.PhysicalIndexMergeJoin:
|
|
if join, ok := plan.(interface{ GetInnerChildIdx() int }); ok {
|
|
propagateProbeParents(plan.Children()[1-join.GetInnerChildIdx()], probeParents)
|
|
|
|
// The core logic of this method:
|
|
// Record every Apply and Index Join we met, record it in a slice, and set it in their inner children.
|
|
newParents := make([]base.PhysicalPlan, len(probeParents), len(probeParents)+1)
|
|
copy(newParents, probeParents)
|
|
newParents = append(newParents, plan)
|
|
propagateProbeParents(plan.Children()[join.GetInnerChildIdx()], newParents)
|
|
}
|
|
case *physicalop.PhysicalTableReader:
|
|
propagateProbeParents(x.TablePlan, probeParents)
|
|
case *physicalop.PhysicalIndexReader:
|
|
propagateProbeParents(x.IndexPlan, probeParents)
|
|
case *physicalop.PhysicalIndexLookUpReader:
|
|
propagateProbeParents(x.IndexPlan, probeParents)
|
|
propagateProbeParents(x.TablePlan, probeParents)
|
|
case *physicalop.PhysicalIndexMergeReader:
|
|
for _, pchild := range x.PartialPlansRaw {
|
|
propagateProbeParents(pchild, probeParents)
|
|
}
|
|
propagateProbeParents(x.TablePlan, probeParents)
|
|
default:
|
|
for _, child := range plan.Children() {
|
|
propagateProbeParents(child, probeParents)
|
|
}
|
|
}
|
|
}
|
|
|
|
func enableParallelApply(sctx base.PlanContext, plan base.PhysicalPlan) base.PhysicalPlan {
|
|
if !sctx.GetSessionVars().EnableParallelApply {
|
|
return plan
|
|
}
|
|
// the parallel apply has three limitation:
|
|
// 1. the parallel implementation now cannot keep order;
|
|
// 2. the inner child has to support clone;
|
|
// 3. if one Apply is in the inner side of another Apply, it cannot be parallel, for example:
|
|
// The topology of 3 Apply operators are A1(A2, A3), which means A2 is the outer child of A1
|
|
// while A3 is the inner child. Then A1 and A2 can be parallel and A3 cannot.
|
|
if apply, ok := plan.(*physicalop.PhysicalApply); ok {
|
|
outerIdx := 1 - apply.InnerChildIdx
|
|
noOrder := len(apply.GetChildReqProps(outerIdx).SortItems) == 0 // limitation 1
|
|
_, err := physicalop.SafeClone(sctx, apply.Children()[apply.InnerChildIdx])
|
|
supportClone := err == nil // limitation 2
|
|
if noOrder && supportClone {
|
|
apply.Concurrency = sctx.GetSessionVars().ExecutorConcurrency
|
|
} else {
|
|
if err != nil {
|
|
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackErrorf("Some apply operators can not be executed in parallel: %v", err))
|
|
} else {
|
|
sctx.GetSessionVars().StmtCtx.AppendWarning(errors.NewNoStackError("Some apply operators can not be executed in parallel"))
|
|
}
|
|
}
|
|
// because of the limitation 3, we cannot parallelize Apply operators in this Apply's inner size,
|
|
// so we only invoke recursively for its outer child.
|
|
apply.SetChild(outerIdx, enableParallelApply(sctx, apply.Children()[outerIdx]))
|
|
return apply
|
|
}
|
|
for i, child := range plan.Children() {
|
|
plan.SetChild(i, enableParallelApply(sctx, child))
|
|
}
|
|
return plan
|
|
}
|
|
|
|
// LogicalOptimizeTest is just exported for test.
|
|
func LogicalOptimizeTest(ctx context.Context, flag uint64, logic base.LogicalPlan) (base.LogicalPlan, error) {
|
|
return logicalOptimize(ctx, flag, logic)
|
|
}
|
|
|
|
func normalizeOptimize(ctx context.Context, flag uint64, logic base.LogicalPlan) (base.LogicalPlan, error) {
|
|
var err error
|
|
// todo: the normalization rule driven way will be changed as stack-driven.
|
|
for i, rule := range normalizeRuleList {
|
|
// The order of flags is same as the order of optRule in the list.
|
|
// We use a bitmask to record which opt rules should be used. If the i-th bit is 1, it means we should
|
|
// apply i-th optimizing rule.
|
|
if flag&(1<<uint(i)) == 0 || isLogicalRuleDisabled(rule) {
|
|
continue
|
|
}
|
|
logic, _, err = rule.Optimize(ctx, logic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return logic, err
|
|
}
|
|
|
|
func logicalOptimize(ctx context.Context, flag uint64, logic base.LogicalPlan) (base.LogicalPlan, error) {
|
|
defer func(begin time.Time) {
|
|
logic.SCtx().GetSessionVars().DurationOptimizer.LogicalOpt = time.Since(begin)
|
|
}(time.Now())
|
|
|
|
var err error
|
|
var againRuleList []base.LogicalOptRule
|
|
for i, rule := range logicalRuleList {
|
|
// The order of flags is same as the order of optRule in the list.
|
|
// We use a bitmask to record which opt rules should be used. If the i-th bit is 1, it means we should
|
|
// apply i-th optimizing rule.
|
|
if flag&(1<<uint(i)) == 0 || isLogicalRuleDisabled(rule) {
|
|
continue
|
|
}
|
|
var planChanged bool
|
|
logic, planChanged, err = rule.Optimize(ctx, logic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// Compute interaction rules that should be optimized again
|
|
interactionRule, ok := optInteractionRuleList[rule]
|
|
if planChanged && ok && isLogicalRuleDisabled(interactionRule) {
|
|
againRuleList = append(againRuleList, interactionRule)
|
|
}
|
|
}
|
|
|
|
// Trigger the interaction rule
|
|
for _, rule := range againRuleList {
|
|
logic, _, err = rule.Optimize(ctx, logic)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return logic, err
|
|
}
|
|
|
|
func isLogicalRuleDisabled(r base.LogicalOptRule) bool {
|
|
disabled := DefaultDisabledLogicalRulesList.Load().(set.StringSet).Exist(r.Name())
|
|
return disabled
|
|
}
|
|
|
|
func physicalOptimize(logic base.LogicalPlan) (plan base.PhysicalPlan, cost float64, err error) {
|
|
begin := time.Now()
|
|
defer func() {
|
|
logic.SCtx().GetSessionVars().DurationOptimizer.PhysicalOpt = time.Since(begin)
|
|
}()
|
|
if _, _, err := logic.RecursiveDeriveStats(nil); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
// if there are too many indexes, this process might take a relatively long time, track its time cost.
|
|
logic.SCtx().GetSessionVars().DurationOptimizer.StatsDerive = time.Since(begin)
|
|
|
|
preparePossibleProperties(logic)
|
|
|
|
prop := &property.PhysicalProperty{
|
|
TaskTp: property.RootTaskType,
|
|
ExpectedCnt: math.MaxFloat64,
|
|
}
|
|
|
|
logic.SCtx().GetSessionVars().StmtCtx.TaskMapBakTS = 0
|
|
t, err := physicalop.FindBestTask(logic, prop)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
if t.Invalid() {
|
|
errMsg := "Can't find a proper physical plan for this query"
|
|
if config.GetGlobalConfig().DisaggregatedTiFlash && !logic.SCtx().GetSessionVars().IsMPPAllowed() {
|
|
errMsg += ": cop and batchCop are not allowed in disaggregated tiflash mode, you should turn on tidb_allow_mpp switch"
|
|
}
|
|
return nil, 0, plannererrors.ErrInternal.GenWithStackByArgs(errMsg)
|
|
}
|
|
|
|
// collect the warnings from task.
|
|
logic.SCtx().GetSessionVars().StmtCtx.AppendWarnings(t.(*physicalop.RootTask).Warnings.GetWarnings())
|
|
|
|
if err = t.Plan().ResolveIndices(); err != nil {
|
|
return nil, 0, err
|
|
}
|
|
cost, err = getPlanCost(t.Plan(), property.RootTaskType, costusage.NewDefaultPlanCostOption())
|
|
return t.Plan(), cost, err
|
|
}
|
|
|
|
// avoidColumnEvaluatorForProjBelowUnion sets AvoidColumnEvaluator to false for the projection operator which is a child of Union operator.
|
|
func avoidColumnEvaluatorForProjBelowUnion(p base.PhysicalPlan) base.PhysicalPlan {
|
|
iteratePhysicalPlan(p, func(p base.PhysicalPlan) bool {
|
|
x, ok := p.(*physicalop.PhysicalUnionAll)
|
|
if ok {
|
|
for _, child := range x.Children() {
|
|
if proj, ok := child.(*physicalop.PhysicalProjection); ok {
|
|
proj.AvoidColumnEvaluator = true
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
return p
|
|
}
|
|
|
|
// eliminateUnionScanAndLock set lock property for PointGet and BatchPointGet and eliminates UnionScan and Lock.
|
|
func eliminateUnionScanAndLock(sctx base.PlanContext, p base.PhysicalPlan) base.PhysicalPlan {
|
|
var pointGet *physicalop.PointGetPlan
|
|
var batchPointGet *physicalop.BatchPointGetPlan
|
|
var physLock *physicalop.PhysicalLock
|
|
var unionScan *physicalop.PhysicalUnionScan
|
|
iteratePhysicalPlan(p, func(p base.PhysicalPlan) bool {
|
|
if len(p.Children()) > 1 {
|
|
return false
|
|
}
|
|
switch x := p.(type) {
|
|
case *physicalop.PointGetPlan:
|
|
pointGet = x
|
|
case *physicalop.BatchPointGetPlan:
|
|
batchPointGet = x
|
|
case *physicalop.PhysicalLock:
|
|
physLock = x
|
|
case *physicalop.PhysicalUnionScan:
|
|
unionScan = x
|
|
}
|
|
return true
|
|
})
|
|
if pointGet == nil && batchPointGet == nil {
|
|
return p
|
|
}
|
|
if physLock == nil && unionScan == nil {
|
|
return p
|
|
}
|
|
if physLock != nil {
|
|
lock, waitTime := getLockWaitTime(sctx, physLock.Lock)
|
|
if lock {
|
|
if pointGet != nil {
|
|
pointGet.Lock = lock
|
|
pointGet.LockWaitTime = waitTime
|
|
} else {
|
|
batchPointGet.Lock = lock
|
|
batchPointGet.LockWaitTime = waitTime
|
|
}
|
|
}
|
|
}
|
|
return transformPhysicalPlan(p, func(p base.PhysicalPlan) base.PhysicalPlan {
|
|
if p == physLock {
|
|
return p.Children()[0]
|
|
}
|
|
if p == unionScan {
|
|
return p.Children()[0]
|
|
}
|
|
return p
|
|
})
|
|
}
|
|
|
|
func iteratePhysicalPlan(p base.PhysicalPlan, f func(p base.PhysicalPlan) bool) {
|
|
if !f(p) {
|
|
return
|
|
}
|
|
for _, child := range p.Children() {
|
|
iteratePhysicalPlan(child, f)
|
|
}
|
|
}
|
|
|
|
func transformPhysicalPlan(p base.PhysicalPlan, f func(p base.PhysicalPlan) base.PhysicalPlan) base.PhysicalPlan {
|
|
for i, child := range p.Children() {
|
|
p.Children()[i] = transformPhysicalPlan(child, f)
|
|
}
|
|
return f(p)
|
|
}
|
|
|
|
func existsCartesianProduct(p base.LogicalPlan) bool {
|
|
if join, ok := p.(*logicalop.LogicalJoin); ok && len(join.EqualConditions) == 0 {
|
|
return join.JoinType == base.InnerJoin || join.JoinType == base.LeftOuterJoin || join.JoinType == base.RightOuterJoin
|
|
}
|
|
return slices.ContainsFunc(p.Children(), existsCartesianProduct)
|
|
}
|
|
|
|
// DefaultDisabledLogicalRulesList indicates the logical rules which should be banned.
|
|
var DefaultDisabledLogicalRulesList *atomic.Value
|
|
|
|
func disableReuseChunkIfNeeded(sctx base.PlanContext, plan base.PhysicalPlan) {
|
|
if !sctx.GetSessionVars().IsAllocValid() {
|
|
return
|
|
}
|
|
if disableReuseChunk, continueIterating := checkOverlongColType(sctx, plan); disableReuseChunk || !continueIterating {
|
|
return
|
|
}
|
|
for _, child := range plan.Children() {
|
|
disableReuseChunkIfNeeded(sctx, child)
|
|
}
|
|
}
|
|
|
|
// checkOverlongColType Check if read field type is long field.
|
|
func checkOverlongColType(sctx base.PlanContext, plan base.PhysicalPlan) (skipReuseChunk bool, continueIterating bool) {
|
|
if plan == nil {
|
|
return false, false
|
|
}
|
|
switch plan.(type) {
|
|
case *physicalop.PhysicalTableReader, *physicalop.PhysicalIndexReader,
|
|
*physicalop.PhysicalIndexLookUpReader, *physicalop.PhysicalIndexMergeReader:
|
|
if existsOverlongType(plan.Schema(), false) {
|
|
sctx.GetSessionVars().ClearAlloc(nil, false)
|
|
return true, false
|
|
}
|
|
case *physicalop.PointGetPlan:
|
|
if existsOverlongType(plan.Schema(), true) {
|
|
sctx.GetSessionVars().ClearAlloc(nil, false)
|
|
return true, false
|
|
}
|
|
default:
|
|
// Other physical operators do not read data, so we can continue to iterate.
|
|
return false, true
|
|
}
|
|
// PhysicalReader and PointGet is at the root, their children are nil or on the tikv/tiflash side.
|
|
// So we can stop iterating.
|
|
return false, false
|
|
}
|
|
|
|
var (
|
|
// MaxMemoryLimitForOverlongType is the memory limit for overlong type column check.
|
|
// Why is it not 128 ?
|
|
// Because many customers allocate a portion of memory to their management programs,
|
|
// the actual amount of usable memory does not align to 128GB.
|
|
// TODO: We are also lacking test data for instances with less than 128GB of memory, so we need to plan the rules here.
|
|
// TODO: internal sql can force to use chunk reuse if we ensure the memory usage is safe.
|
|
// TODO: We can consider the limit/Topn in the future.
|
|
MaxMemoryLimitForOverlongType = 120 * size.GB
|
|
maxFlenForOverlongType = mysql.MaxBlobWidth * 2
|
|
)
|
|
|
|
// existsOverlongType Check if exists long type column.
|
|
// If pointGet is true, we will check the total Flen of all columns, if it exceeds maxFlenForOverlongType,
|
|
// we will disable chunk reuse.
|
|
// For a point get, there is only one row, so we can easily estimate the size.
|
|
// However, for a non-point get, there may be many rows, and it is impossible to determine the memory size used.
|
|
// Therefore, we can only forcibly skip the reuse chunk.
|
|
func existsOverlongType(schema *expression.Schema, pointGet bool) bool {
|
|
if schema == nil {
|
|
return false
|
|
}
|
|
totalFlen := 0
|
|
for _, column := range schema.Columns {
|
|
switch column.RetType.GetType() {
|
|
case mysql.TypeLongBlob,
|
|
mysql.TypeBlob, mysql.TypeJSON, mysql.TypeTiDBVectorFloat32:
|
|
return true
|
|
case mysql.TypeVarString, mysql.TypeVarchar, mysql.TypeTinyBlob, mysql.TypeMediumBlob:
|
|
// if the column is varchar and the length of
|
|
// the column is defined to be more than 1000,
|
|
// the column is considered a large type and
|
|
// disable chunk_reuse.
|
|
if column.RetType.GetFlen() <= 1000 {
|
|
continue
|
|
}
|
|
if pointGet {
|
|
totalFlen += column.RetType.GetFlen()
|
|
if checkOverlongTypeForPointGet(totalFlen) {
|
|
return true
|
|
}
|
|
continue
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func checkOverlongTypeForPointGet(totalFlen int) bool {
|
|
totalMemory, err := memory.MemTotal()
|
|
if err != nil || totalMemory <= 0 {
|
|
return true
|
|
}
|
|
if totalMemory >= MaxMemoryLimitForOverlongType {
|
|
if totalFlen <= maxFlenForOverlongType {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|