From c91bc2014a3eed4b6d9c92981feff97c465cc7ff Mon Sep 17 00:00:00 2001 From: Arenatlx <314806019@qq.com> Date: Tue, 25 Jun 2024 18:32:51 +0800 Subject: [PATCH] planner: classsify logical aggregation logic into a seperate file for later pkg move (#54187) ref pingcap/tidb#51664, ref pingcap/tidb#52714 --- pkg/expression/util.go | 27 + pkg/planner/cascades/transformation_rules.go | 4 +- pkg/planner/core/BUILD.bazel | 2 + pkg/planner/core/core_init.go | 4 + pkg/planner/core/exhaust_physical_plans.go | 93 +-- pkg/planner/core/explain.go | 19 - pkg/planner/core/initialize.go | 157 ---- pkg/planner/core/logical_aggregation.go | 763 ++++++++++++++++++ pkg/planner/core/logical_initialize.go | 171 ++++ pkg/planner/core/logical_plans.go | 244 ------ pkg/planner/core/property_cols_prune.go | 22 - .../core/rule_aggregation_push_down.go | 6 +- pkg/planner/core/rule_build_key_info.go | 30 - pkg/planner/core/rule_column_pruning.go | 125 +-- pkg/planner/core/rule_decorrelate.go | 16 - pkg/planner/core/rule_eliminate_projection.go | 17 +- pkg/planner/core/rule_predicate_push_down.go | 119 --- pkg/planner/core/stats.go | 60 -- pkg/planner/util/misc.go | 14 + pkg/planner/util/utilfuncp/BUILD.bazel | 1 + .../util/utilfuncp/func_pointer_misc.go | 20 +- 21 files changed, 1018 insertions(+), 896 deletions(-) create mode 100644 pkg/planner/core/logical_aggregation.go create mode 100644 pkg/planner/core/logical_initialize.go diff --git a/pkg/expression/util.go b/pkg/expression/util.go index 43cb1a0095..cfca9c7b6b 100644 --- a/pkg/expression/util.go +++ b/pkg/expression/util.go @@ -1865,3 +1865,30 @@ func ConstExprConsiderPlanCache(expr Expression, inPlanCache bool) bool { return false } } + +// ExprsHasSideEffects checks if any of the expressions has side effects. +func ExprsHasSideEffects(exprs []Expression) bool { + for _, expr := range exprs { + if ExprHasSetVarOrSleep(expr) { + return true + } + } + return false +} + +// ExprHasSetVarOrSleep checks if the expression has SetVar function or Sleep function. +func ExprHasSetVarOrSleep(expr Expression) bool { + scalaFunc, isScalaFunc := expr.(*ScalarFunction) + if !isScalaFunc { + return false + } + if scalaFunc.FuncName.L == ast.SetVar || scalaFunc.FuncName.L == ast.Sleep { + return true + } + for _, arg := range scalaFunc.GetArgs() { + if ExprHasSetVarOrSleep(arg) { + return true + } + } + return false +} diff --git a/pkg/planner/cascades/transformation_rules.go b/pkg/planner/cascades/transformation_rules.go index 9418ca32cd..2e3d84d95f 100644 --- a/pkg/planner/cascades/transformation_rules.go +++ b/pkg/planner/cascades/transformation_rules.go @@ -1165,7 +1165,7 @@ func (*MergeAdjacentProjection) OnTransform(old *memo.ExprIter) (newExprs []*mem proj := old.GetExpr().ExprNode.(*plannercore.LogicalProjection) childGroup := old.Children[0].Group child := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalProjection) - if plannercore.ExprsHasSideEffects(child.Exprs) { + if expression.ExprsHasSideEffects(child.Exprs) { return nil, false, false, nil } @@ -1517,7 +1517,7 @@ func NewRuleMergeAggregationProjection() Transformation { // Match implements Transformation interface. func (*MergeAggregationProjection) Match(old *memo.ExprIter) bool { proj := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalProjection) - return !plannercore.ExprsHasSideEffects(proj.Exprs) + return !expression.ExprsHasSideEffects(proj.Exprs) } // OnTransform implements Transformation interface. diff --git a/pkg/planner/core/BUILD.bazel b/pkg/planner/core/BUILD.bazel index 01929bb317..7958973d7b 100644 --- a/pkg/planner/core/BUILD.bazel +++ b/pkg/planner/core/BUILD.bazel @@ -21,6 +21,8 @@ go_library( "indexmerge_path.go", "indexmerge_unfinished_path.go", "initialize.go", + "logical_aggregation.go", + "logical_initialize.go", "logical_plan_builder.go", "logical_plans.go", "memtable_predicate_extractor.go", diff --git a/pkg/planner/core/core_init.go b/pkg/planner/core/core_init.go index 2e701a7989..74428abfa9 100644 --- a/pkg/planner/core/core_init.go +++ b/pkg/planner/core/core_init.go @@ -32,6 +32,10 @@ func init() { utilfuncp.HasMaxOneRowUtil = HasMaxOneRow utilfuncp.GetTaskPlanCost = getTaskPlanCost utilfuncp.CanPushToCopImpl = canPushToCopImpl + utilfuncp.GetStreamAggs = getStreamAggs + utilfuncp.GetHashAggs = getHashAggs + utilfuncp.PruneByItems = pruneByItems + utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan diff --git a/pkg/planner/core/exhaust_physical_plans.go b/pkg/planner/core/exhaust_physical_plans.go index da0945f087..543266ff96 100644 --- a/pkg/planner/core/exhaust_physical_plans.go +++ b/pkg/planner/core/exhaust_physical_plans.go @@ -67,19 +67,6 @@ func (p *LogicalUnionScan) ExhaustPhysicalPlans(prop *property.PhysicalProperty) return []base.PhysicalPlan{us}, true, nil } -func getMaxSortPrefix(sortCols, allCols []*expression.Column) []int { - tmpSchema := expression.NewSchema(allCols...) - sortColOffsets := make([]int, 0, len(sortCols)) - for _, sortCol := range sortCols { - offset := tmpSchema.ColumnIndex(sortCol) - if offset == -1 { - return sortColOffsets - } - sortColOffsets = append(sortColOffsets, offset) - } - return sortColOffsets -} - func findMaxPrefixLen(candidates [][]*expression.Column, keys []*expression.Column) int { maxLen := 0 for _, candidateKeys := range candidates { @@ -183,7 +170,7 @@ func (p *LogicalJoin) GetMergeJoin(prop *property.PhysicalProperty, schema *expr return nil } for _, lhsChildProperty := range p.LeftProperties { - offsets := getMaxSortPrefix(lhsChildProperty, leftJoinKeys) + offsets := util.GetMaxSortPrefix(lhsChildProperty, leftJoinKeys) // If not all equal conditions hit properties. We ban merge join heuristically. Because in this case, merge join // may get a very low performance. In executor, executes join results before other conditions filter it. if len(offsets) < len(leftJoinKeys) { @@ -1459,7 +1446,7 @@ func (p *LogicalJoin) constructIndexJoinInnerSideTask(dsCopTask *CopTask, ds *Da if len(groupByCols) != len(la.GroupByItems) { preferStream = false } - if la.HasDistinct() && !la.distinctArgsMeetsProperty() { + if la.HasDistinct() && !la.DistinctArgsMeetsProperty() { preferStream = false } // sort items must be the super set of group by items @@ -3180,11 +3167,6 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo return ret } -// CanPushToCop implements LogicalPlan interface. -func (la *LogicalAggregation) CanPushToCop(storeTp kv.StoreType) bool { - return la.BaseLogicalPlan.CanPushToCop(storeTp) && !la.NoCopPushDown -} - func getEnforcedStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan { if prop.IsFlashProp() { return nil @@ -3230,20 +3212,8 @@ func getEnforcedStreamAggs(la *LogicalAggregation, prop *property.PhysicalProper return enforcedAggs } -func (la *LogicalAggregation) distinctArgsMeetsProperty() bool { - for _, aggFunc := range la.AggFuncs { - if aggFunc.HasDistinct { - for _, distinctArg := range aggFunc.Args { - if !expression.Contains(la.GroupByItems, distinctArg) { - return false - } - } - } - } - return true -} - -func getStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan { +func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan { + la := lp.(*LogicalAggregation) // TODO: support CopTiFlash task type in stream agg if prop.IsFlashProp() { return nil @@ -3285,7 +3255,7 @@ func getStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []ba // if variable doesn't allow DistinctAggPushDown, just produce root task type. // if variable does allow DistinctAggPushDown, but OP itself can't be pushed down to tikv, just produce root task type. taskTypes = []property.TaskType{property.RootTaskType} - } else if !la.distinctArgsMeetsProperty() { + } else if !la.DistinctArgsMeetsProperty() { continue } } else if !la.PreferAggToCop { @@ -3321,7 +3291,7 @@ func getStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []ba } // TODO: support more operators and distinct later -func (la *LogicalAggregation) checkCanPushDownToMPP() bool { +func checkCanPushDownToMPP(la *LogicalAggregation) bool { hasUnsupportedDistinct := false for _, agg := range la.AggFuncs { // MPP does not support distinct except count distinct now @@ -3494,17 +3464,18 @@ func tryToGetMppHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty // for 2, the final result for this physical operator enumeration is chosen or rejected is according to more factors later (hint/variable/partition/virtual-col/cost) // // That is to say, the non-complete positive judgement of canPushDownToMPP/canPushDownToTiFlash/canPushDownToTiKV is not that for sure here. -func getHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan { +func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan { + la := lp.(*LogicalAggregation) if !prop.IsSortItemEmpty() { return nil } - if prop.TaskTp == property.MppTaskType && !la.checkCanPushDownToMPP() { + if prop.TaskTp == property.MppTaskType && !checkCanPushDownToMPP(la) { return nil } hashAggs := make([]base.PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes())) taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType} canPushDownToTiFlash := la.CanPushToCop(kv.TiFlash) - canPushDownToMPP := canPushDownToTiFlash && la.SCtx().GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP() + canPushDownToMPP := canPushDownToTiFlash && la.SCtx().GetSessionVars().IsMPPAllowed() && checkCanPushDownToMPP(la) if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.SCtx().GetSessionVars().AllowDistinctAggPushDown || !la.CanPushToCop(kv.TiKV) { @@ -3554,50 +3525,6 @@ func getHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base return hashAggs } -// ResetHintIfConflicted resets the PreferAggType if they are conflicted, -// and returns the two PreferAggType hints. -func (la *LogicalAggregation) ResetHintIfConflicted() (preferHash bool, preferStream bool) { - preferHash = (la.PreferAggType & h.PreferHashAgg) > 0 - preferStream = (la.PreferAggType & h.PreferStreamAgg) > 0 - if preferHash && preferStream { - la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer aggregation hints are conflicted") - la.PreferAggType = 0 - preferHash, preferStream = false, false - } - return -} - -// ExhaustPhysicalPlans implements LogicalPlan interface. -func (la *LogicalAggregation) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { - if la.PreferAggToCop { - if !la.CanPushToCop(kv.TiKV) { - la.SCtx().GetSessionVars().StmtCtx.SetHintWarning( - "Optimizer Hint AGG_TO_COP is inapplicable") - la.PreferAggToCop = false - } - } - - preferHash, preferStream := la.ResetHintIfConflicted() - - hashAggs := getHashAggs(la, prop) - if hashAggs != nil && preferHash { - return hashAggs, true, nil - } - - streamAggs := getStreamAggs(la, prop) - if streamAggs != nil && preferStream { - return streamAggs, true, nil - } - - aggs := append(hashAggs, streamAggs...) - - if streamAggs == nil && preferStream && !prop.IsSortItemEmpty() { - la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer Hint STREAM_AGG is inapplicable") - } - - return aggs, !(preferStream || preferHash), nil -} - // ExhaustPhysicalPlans implements LogicalPlan interface. func (p *LogicalSelection) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { newProps := make([]*property.PhysicalProperty, 0, 2) diff --git a/pkg/planner/core/explain.go b/pkg/planner/core/explain.go index 68d2a15db8..4d94e76a70 100644 --- a/pkg/planner/core/explain.go +++ b/pkg/planner/core/explain.go @@ -897,25 +897,6 @@ func (p *LogicalJoin) ExplainInfo() string { return buffer.String() } -// ExplainInfo implements Plan interface. -func (p *LogicalAggregation) ExplainInfo() string { - buffer := bytes.NewBufferString("") - if len(p.GroupByItems) > 0 { - fmt.Fprintf(buffer, "group by:%s, ", - expression.SortedExplainExpressionList(p.SCtx().GetExprCtx().GetEvalCtx(), p.GroupByItems)) - } - if len(p.AggFuncs) > 0 { - buffer.WriteString("funcs:") - for i, agg := range p.AggFuncs { - buffer.WriteString(aggregation.ExplainAggFunc(p.SCtx().GetExprCtx().GetEvalCtx(), agg, false)) - if i+1 < len(p.AggFuncs) { - buffer.WriteString(", ") - } - } - } - return buffer.String() -} - // ExplainInfo implements Plan interface. func (p *LogicalProjection) ExplainInfo() string { return expression.ExplainExpressionList(p.Exprs, p.Schema()) diff --git a/pkg/planner/core/initialize.go b/pkg/planner/core/initialize.go index 4ca7338142..8b53038f95 100644 --- a/pkg/planner/core/initialize.go +++ b/pkg/planner/core/initialize.go @@ -19,61 +19,12 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/core/operator/baseimpl" - "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" "github.com/pingcap/tidb/pkg/planner/property" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/plancodec" "github.com/pingcap/tidb/pkg/util/size" ) -// Init initializes LogicalAggregation. -func (la LogicalAggregation) Init(ctx base.PlanContext, offset int) *LogicalAggregation { - la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeAgg, &la, offset) - return &la -} - -// Init initializes LogicalJoin. -func (p LogicalJoin) Init(ctx base.PlanContext, offset int) *LogicalJoin { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeJoin, &p, offset) - return &p -} - -// Init initializes DataSource. -func (ds DataSource) Init(ctx base.PlanContext, offset int) *DataSource { - ds.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDataSource, &ds, offset) - return &ds -} - -// Init initializes TiKVSingleGather. -func (sg TiKVSingleGather) Init(ctx base.PlanContext, offset int) *TiKVSingleGather { - sg.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTiKVSingleGather, &sg, offset) - return &sg -} - -// Init initializes LogicalTableScan. -func (ts LogicalTableScan) Init(ctx base.PlanContext, offset int) *LogicalTableScan { - ts.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTableScan, &ts, offset) - return &ts -} - -// Init initializes LogicalIndexScan. -func (is LogicalIndexScan) Init(ctx base.PlanContext, offset int) *LogicalIndexScan { - is.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeIdxScan, &is, offset) - return &is -} - -// Init initializes LogicalApply. -func (la LogicalApply) Init(ctx base.PlanContext, offset int) *LogicalApply { - la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeApply, &la, offset) - return &la -} - -// Init initializes LogicalSelection. -func (p LogicalSelection) Init(ctx base.PlanContext, qbOffset int) *LogicalSelection { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSel, &p, qbOffset) - return &p -} - // Init initializes PhysicalSelection. func (p PhysicalSelection) Init(ctx base.PlanContext, stats *property.StatsInfo, qbOffset int, props ...*property.PhysicalProperty) *PhysicalSelection { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSel, &p, qbOffset) @@ -82,24 +33,6 @@ func (p PhysicalSelection) Init(ctx base.PlanContext, stats *property.StatsInfo, return &p } -// Init initializes LogicalUnionScan. -func (p LogicalUnionScan) Init(ctx base.PlanContext, qbOffset int) *LogicalUnionScan { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnionScan, &p, qbOffset) - return &p -} - -// Init initializes LogicalProjection. -func (p LogicalProjection) Init(ctx base.PlanContext, qbOffset int) *LogicalProjection { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeProj, &p, qbOffset) - return &p -} - -// Init initializes LogicalProjection. -func (p LogicalExpand) Init(ctx base.PlanContext, offset int) *LogicalExpand { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeExpand, &p, offset) - return &p -} - // Init initializes PhysicalProjection. func (p PhysicalProjection) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalProjection { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeProj, &p, offset) @@ -108,18 +41,6 @@ func (p PhysicalProjection) Init(ctx base.PlanContext, stats *property.StatsInfo return &p } -// Init initializes LogicalUnionAll. -func (p LogicalUnionAll) Init(ctx base.PlanContext, offset int) *LogicalUnionAll { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnion, &p, offset) - return &p -} - -// Init initializes LogicalPartitionUnionAll. -func (p LogicalPartitionUnionAll) Init(ctx base.PlanContext, offset int) *LogicalPartitionUnionAll { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypePartitionUnion, &p, offset) - return &p -} - // Init initializes PhysicalUnionAll. func (p PhysicalUnionAll) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalUnionAll { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeUnion, &p, offset) @@ -128,12 +49,6 @@ func (p PhysicalUnionAll) Init(ctx base.PlanContext, stats *property.StatsInfo, return &p } -// Init initializes LogicalSort. -func (ls LogicalSort) Init(ctx base.PlanContext, offset int) *LogicalSort { - ls.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSort, &ls, offset) - return &ls -} - // Init initializes PhysicalSort. func (p PhysicalSort) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalSort { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSort, &p, offset) @@ -150,12 +65,6 @@ func (p NominalSort) Init(ctx base.PlanContext, stats *property.StatsInfo, offse return &p } -// Init initializes LogicalTopN. -func (lt LogicalTopN) Init(ctx base.PlanContext, offset int) *LogicalTopN { - lt.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTopN, <, offset) - return < -} - // Init initializes PhysicalTopN. func (p PhysicalTopN) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalTopN { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeTopN, &p, offset) @@ -164,12 +73,6 @@ func (p PhysicalTopN) Init(ctx base.PlanContext, stats *property.StatsInfo, offs return &p } -// Init initializes LogicalLimit. -func (p LogicalLimit) Init(ctx base.PlanContext, offset int) *LogicalLimit { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLimit, &p, offset) - return &p -} - // Init initializes PhysicalLimit. func (p PhysicalLimit) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalLimit { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeLimit, &p, offset) @@ -178,12 +81,6 @@ func (p PhysicalLimit) Init(ctx base.PlanContext, stats *property.StatsInfo, off return &p } -// Init initializes LogicalTableDual. -func (p LogicalTableDual) Init(ctx base.PlanContext, offset int) *LogicalTableDual { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDual, &p, offset) - return &p -} - // Init initializes PhysicalTableDual. func (p PhysicalTableDual) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int) *PhysicalTableDual { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeDual, &p, offset) @@ -191,12 +88,6 @@ func (p PhysicalTableDual) Init(ctx base.PlanContext, stats *property.StatsInfo, return &p } -// Init initializes LogicalMaxOneRow. -func (p LogicalMaxOneRow) Init(ctx base.PlanContext, offset int) *LogicalMaxOneRow { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMaxOneRow, &p, offset) - return &p -} - // Init initializes PhysicalMaxOneRow. func (p PhysicalMaxOneRow) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalMaxOneRow { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMaxOneRow, &p, offset) @@ -205,12 +96,6 @@ func (p PhysicalMaxOneRow) Init(ctx base.PlanContext, stats *property.StatsInfo, return &p } -// Init initializes LogicalWindow. -func (p LogicalWindow) Init(ctx base.PlanContext, offset int) *LogicalWindow { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeWindow, &p, offset) - return &p -} - // Init initializes PhysicalWindow. func (p PhysicalWindow) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalWindow { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeWindow, &p, offset) @@ -265,18 +150,6 @@ func (p ImportInto) Init(ctx base.PlanContext) *ImportInto { return &p } -// Init initializes LogicalShow. -func (p LogicalShow) Init(ctx base.PlanContext) *LogicalShow { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShow, &p, 0) - return &p -} - -// Init initializes LogicalShowDDLJobs. -func (p LogicalShowDDLJobs) Init(ctx base.PlanContext) *LogicalShowDDLJobs { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShowDDLJobs, &p, 0) - return &p -} - // Init initializes PhysicalShow. func (p PhysicalShow) Init(ctx base.PlanContext) *PhysicalShow { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeShow, &p, 0) @@ -293,12 +166,6 @@ func (p PhysicalShowDDLJobs) Init(ctx base.PlanContext) *PhysicalShowDDLJobs { return &p } -// Init initializes LogicalLock. -func (p LogicalLock) Init(ctx base.PlanContext) *LogicalLock { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLock, &p, 0) - return &p -} - // Init initializes PhysicalLock. func (p PhysicalLock) Init(ctx base.PlanContext, stats *property.StatsInfo, props ...*property.PhysicalProperty) *PhysicalLock { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeLock, &p, 0) @@ -319,12 +186,6 @@ func (p PhysicalIndexScan) Init(ctx base.PlanContext, offset int) *PhysicalIndex return &p } -// Init initializes LogicalMemTable. -func (p LogicalMemTable) Init(ctx base.PlanContext, offset int) *LogicalMemTable { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset) - return &p -} - // Init initializes PhysicalMemTable. func (p PhysicalMemTable) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int) *PhysicalMemTable { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset) @@ -594,12 +455,6 @@ func flattenPushDownPlan(p base.PhysicalPlan) []base.PhysicalPlan { return plans } -// Init only assigns type and context. -func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTE, &p, offset) - return &p -} - // Init only assigns type and context. func (p PhysicalCTE) Init(ctx base.PlanContext, stats *property.StatsInfo) *PhysicalCTE { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeCTE, &p, 0) @@ -607,12 +462,6 @@ func (p PhysicalCTE) Init(ctx base.PlanContext, stats *property.StatsInfo) *Phys return &p } -// Init only assigns type and context. -func (p LogicalCTETable) Init(ctx base.PlanContext, offset int) *LogicalCTETable { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTETable, &p, offset) - return &p -} - // Init only assigns type and context. func (p PhysicalCTETable) Init(ctx base.PlanContext, stats *property.StatsInfo) *PhysicalCTETable { p.Plan = baseimpl.NewBasePlan(ctx, plancodec.TypeCTETable, 0) @@ -634,12 +483,6 @@ func (p FKCascade) Init(ctx base.PlanContext) *FKCascade { return &p } -// Init initializes LogicalSequence -func (p LogicalSequence) Init(ctx base.PlanContext, offset int) *LogicalSequence { - p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSequence, &p, offset) - return &p -} - // Init initializes PhysicalSequence func (p PhysicalSequence) Init(ctx base.PlanContext, stats *property.StatsInfo, blockOffset int, props ...*property.PhysicalProperty) *PhysicalSequence { p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSequence, &p, blockOffset) diff --git a/pkg/planner/core/logical_aggregation.go b/pkg/planner/core/logical_aggregation.go new file mode 100644 index 0000000000..e548c536b0 --- /dev/null +++ b/pkg/planner/core/logical_aggregation.go @@ -0,0 +1,763 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "bytes" + "fmt" + + "github.com/pingcap/tidb/pkg/expression" + "github.com/pingcap/tidb/pkg/expression/aggregation" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/planner/cardinality" + "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" + fd "github.com/pingcap/tidb/pkg/planner/funcdep" + "github.com/pingcap/tidb/pkg/planner/property" + "github.com/pingcap/tidb/pkg/planner/util" + "github.com/pingcap/tidb/pkg/planner/util/optimizetrace" + "github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace" + "github.com/pingcap/tidb/pkg/planner/util/utilfuncp" + h "github.com/pingcap/tidb/pkg/util/hint" + "github.com/pingcap/tidb/pkg/util/intset" + "github.com/pingcap/tidb/pkg/util/plancodec" +) + +// LogicalAggregation represents an aggregate plan. +type LogicalAggregation struct { + logicalop.LogicalSchemaProducer + + AggFuncs []*aggregation.AggFuncDesc + GroupByItems []expression.Expression + + // PreferAggType And PreferAggToCop stores aggregation hint information. + PreferAggType uint + PreferAggToCop bool + + PossibleProperties [][]*expression.Column + InputCount float64 // InputCount is the input count of this plan. + + // NoCopPushDown indicates if planner must not push this agg down to coprocessor. + // It is true when the agg is in the outer child tree of apply. + NoCopPushDown bool +} + +// Init initializes LogicalAggregation. +func (la LogicalAggregation) Init(ctx base.PlanContext, offset int) *LogicalAggregation { + la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeAgg, &la, offset) + return &la +} + +// *************************** start implementation of Plan interface *************************** + +// ExplainInfo implements base.Plan.<4th> interface. +func (la *LogicalAggregation) ExplainInfo() string { + buffer := bytes.NewBufferString("") + if len(la.GroupByItems) > 0 { + fmt.Fprintf(buffer, "group by:%s, ", + expression.SortedExplainExpressionList(la.SCtx().GetExprCtx().GetEvalCtx(), la.GroupByItems)) + } + if len(la.AggFuncs) > 0 { + buffer.WriteString("funcs:") + for i, agg := range la.AggFuncs { + buffer.WriteString(aggregation.ExplainAggFunc(la.SCtx().GetExprCtx().GetEvalCtx(), agg, false)) + if i+1 < len(la.AggFuncs) { + buffer.WriteString(", ") + } + } + } + return buffer.String() +} + +// ReplaceExprColumns implements base.Plan.<5th> interface. +func (la *LogicalAggregation) ReplaceExprColumns(replace map[string]*expression.Column) { + for _, agg := range la.AggFuncs { + for _, aggExpr := range agg.Args { + ResolveExprAndReplace(aggExpr, replace) + } + for _, orderExpr := range agg.OrderByItems { + ResolveExprAndReplace(orderExpr.Expr, replace) + } + } + for _, gbyItem := range la.GroupByItems { + ResolveExprAndReplace(gbyItem, replace) + } +} + +// *************************** end implementation of Plan interface *************************** + +// *************************** start implementation of logicalPlan interface *************************** + +// HashCode inherits BaseLogicalPlan.LogicalPlan.<0th> implementation. + +// PredicatePushDown implements base.LogicalPlan.<1st> interface. +func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) { + condsToPush, ret := la.splitCondForAggregation(predicates) + la.BaseLogicalPlan.PredicatePushDown(condsToPush, opt) + return ret, la +} + +// PruneColumns implements base.LogicalPlan.<2nd> interface. +func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) { + child := la.Children()[0] + used := expression.GetUsedList(la.SCtx().GetExprCtx().GetEvalCtx(), parentUsedCols, la.Schema()) + prunedColumns := make([]*expression.Column, 0) + prunedFunctions := make([]*aggregation.AggFuncDesc, 0) + prunedGroupByItems := make([]expression.Expression, 0) + + allFirstRow := true + allRemainFirstRow := true + for i := len(used) - 1; i >= 0; i-- { + if la.AggFuncs[i].Name != ast.AggFuncFirstRow { + allFirstRow = false + } + if !used[i] && !expression.ExprsHasSideEffects(la.AggFuncs[i].Args) { + prunedColumns = append(prunedColumns, la.Schema().Columns[i]) + prunedFunctions = append(prunedFunctions, la.AggFuncs[i]) + la.Schema().Columns = append(la.Schema().Columns[:i], la.Schema().Columns[i+1:]...) + la.AggFuncs = append(la.AggFuncs[:i], la.AggFuncs[i+1:]...) + } else if la.AggFuncs[i].Name != ast.AggFuncFirstRow { + allRemainFirstRow = false + } + } + logicaltrace.AppendColumnPruneTraceStep(la, prunedColumns, opt) + logicaltrace.AppendFunctionPruneTraceStep(la, prunedFunctions, opt) + //nolint: prealloc + var selfUsedCols []*expression.Column + for _, aggrFunc := range la.AggFuncs { + selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil) + + var cols []*expression.Column + aggrFunc.OrderByItems, cols = utilfuncp.PruneByItems(la, aggrFunc.OrderByItems, opt) + selfUsedCols = append(selfUsedCols, cols...) + } + if len(la.AggFuncs) == 0 || (!allFirstRow && allRemainFirstRow) { + // If all the aggregate functions are pruned, we should add an aggregate function to maintain the info of row numbers. + // For all the aggregate functions except `first_row`, if we have an empty table defined as t(a,b), + // `select agg(a) from t` would always return one row, while `select agg(a) from t group by b` would return empty. + // For `first_row` which is only used internally by tidb, `first_row(a)` would always return empty for empty input now. + var err error + var newAgg *aggregation.AggFuncDesc + if allFirstRow { + newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncFirstRow, []expression.Expression{expression.NewOne()}, false) + } else { + newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncCount, []expression.Expression{expression.NewOne()}, false) + } + if err != nil { + return nil, err + } + la.AggFuncs = append(la.AggFuncs, newAgg) + col := &expression.Column{ + UniqueID: la.SCtx().GetSessionVars().AllocPlanColumnID(), + RetType: newAgg.RetTp, + } + la.Schema().Columns = append(la.Schema().Columns, col) + } + + if len(la.GroupByItems) > 0 { + for i := len(la.GroupByItems) - 1; i >= 0; i-- { + cols := expression.ExtractColumns(la.GroupByItems[i]) + if len(cols) == 0 && !expression.ExprHasSetVarOrSleep(la.GroupByItems[i]) { + prunedGroupByItems = append(prunedGroupByItems, la.GroupByItems[i]) + la.GroupByItems = append(la.GroupByItems[:i], la.GroupByItems[i+1:]...) + } else { + selfUsedCols = append(selfUsedCols, cols...) + } + } + // If all the group by items are pruned, we should add a constant 1 to keep the correctness. + // Because `select count(*) from t` is different from `select count(*) from t group by 1`. + if len(la.GroupByItems) == 0 { + la.GroupByItems = []expression.Expression{expression.NewOne()} + } + } + logicaltrace.AppendGroupByItemsPruneTraceStep(la, prunedGroupByItems, opt) + var err error + la.Children()[0], err = child.PruneColumns(selfUsedCols, opt) + if err != nil { + return nil, err + } + // update children[0] + child = la.Children()[0] + // Do an extra Projection Elimination here. This is specially for empty Projection below Aggregation. + // This kind of Projection would cause some bugs for MPP plan and is safe to be removed. + // This kind of Projection should be removed in Projection Elimination, but currently PrunColumnsAgain is + // the last rule. So we specially handle this case here. + if childProjection, isProjection := child.(*LogicalProjection); isProjection { + if len(childProjection.Exprs) == 0 && childProjection.Schema().Len() == 0 { + childOfChild := childProjection.Children()[0] + la.SetChildren(childOfChild) + } + } + return la, nil +} + +// FindBestTask inherits BaseLogicalPlan.LogicalPlan.<3rd> implementation. + +// BuildKeyInfo implements base.LogicalPlan.<4th> interface. +func (la *LogicalAggregation) BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema) { + // According to the issue#46962, we can ignore the judgment of partial agg + // Sometimes, the agg inside of subquery and there is a true condition in where clause, the agg function is empty. + // For example, ``` select xxxx from xxx WHERE TRUE = ALL ( SELECT TRUE GROUP BY 1 LIMIT 1 ) IS NULL IS NOT NULL; + // In this case, the agg is complete mode and we can ignore this check. + if len(la.AggFuncs) != 0 && la.IsPartialModeAgg() { + return + } + la.LogicalSchemaProducer.BuildKeyInfo(selfSchema, childSchema) + la.buildSelfKeyInfo(selfSchema) +} + +// PushDownTopN inherits BaseLogicalPlan.LogicalPlan.<5rd> implementation. + +// DeriveTopN inherits BaseLogicalPlan.LogicalPlan.<6th> interface. + +// PredicateSimplification inherits BaseLogicalPlan.LogicalPlan.<7th> implementation. + +// ConstantPropagation inherits BaseLogicalPlan.LogicalPlan.<8th> implementation. + +// PullUpConstantPredicates inherits BaseLogicalPlan.LogicalPlan.<9th> implementation. + +// RecursiveDeriveStats inherits BaseLogicalPlan.LogicalPlan.<10th> implementation. + +// DeriveStats implement base.LogicalPlan.<11th> interface. +func (la *LogicalAggregation) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error) { + childProfile := childStats[0] + gbyCols := make([]*expression.Column, 0, len(la.GroupByItems)) + for _, gbyExpr := range la.GroupByItems { + cols := expression.ExtractColumns(gbyExpr) + gbyCols = append(gbyCols, cols...) + } + if la.StatsInfo() != nil { + // Reload GroupNDVs since colGroups may have changed. + la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols) + return la.StatsInfo(), nil + } + ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(gbyCols, childSchema[0], childProfile) + la.SetStats(&property.StatsInfo{ + RowCount: ndv, + ColNDVs: make(map[int64]float64, selfSchema.Len()), + }) + // We cannot estimate the ColNDVs for every output, so we use a conservative strategy. + for _, col := range selfSchema.Columns { + la.StatsInfo().ColNDVs[col.UniqueID] = ndv + } + la.InputCount = childProfile.RowCount + la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols) + return la.StatsInfo(), nil +} + +// ExtractColGroups implements base.LogicalPlan.<12th> interface. +func (la *LogicalAggregation) ExtractColGroups(_ [][]*expression.Column) [][]*expression.Column { + // Parent colGroups would be dicarded, because aggregation would make NDV of colGroups + // which does not match GroupByItems invalid. + // Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b, + // but we have no other approaches for the NDV estimation of these cases + // except for using the independent assumption, unless we can use stats of expression index. + gbyCols := make([]*expression.Column, 0, len(la.GroupByItems)) + for _, gbyExpr := range la.GroupByItems { + cols := expression.ExtractColumns(gbyExpr) + gbyCols = append(gbyCols, cols...) + } + if len(gbyCols) > 1 { + return [][]*expression.Column{expression.SortColumns(gbyCols)} + } + return nil +} + +// PreparePossibleProperties implements base.LogicalPlan.<13th> interface. +func (la *LogicalAggregation) PreparePossibleProperties(_ *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column { + childProps := childrenProperties[0] + // If there's no group-by item, the stream aggregation could have no order property. So we can add an empty property + // when its group-by item is empty. + if len(la.GroupByItems) == 0 { + la.PossibleProperties = [][]*expression.Column{nil} + return nil + } + resultProperties := make([][]*expression.Column, 0, len(childProps)) + groupByCols := la.GetGroupByCols() + for _, possibleChildProperty := range childProps { + sortColOffsets := util.GetMaxSortPrefix(possibleChildProperty, groupByCols) + if len(sortColOffsets) == len(groupByCols) { + prop := possibleChildProperty[:len(groupByCols)] + resultProperties = append(resultProperties, prop) + } + } + la.PossibleProperties = resultProperties + return resultProperties +} + +// ExhaustPhysicalPlans implements base.LogicalPlan.<14th> interface. +func (la *LogicalAggregation) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) { + if la.PreferAggToCop { + if !la.CanPushToCop(kv.TiKV) { + la.SCtx().GetSessionVars().StmtCtx.SetHintWarning( + "Optimizer Hint AGG_TO_COP is inapplicable") + la.PreferAggToCop = false + } + } + + preferHash, preferStream := la.ResetHintIfConflicted() + + hashAggs := utilfuncp.GetHashAggs(la, prop) + if hashAggs != nil && preferHash { + return hashAggs, true, nil + } + + streamAggs := utilfuncp.GetStreamAggs(la, prop) + if streamAggs != nil && preferStream { + return streamAggs, true, nil + } + + aggs := append(hashAggs, streamAggs...) + + if streamAggs == nil && preferStream && !prop.IsSortItemEmpty() { + la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer Hint STREAM_AGG is inapplicable") + } + + return aggs, !(preferStream || preferHash), nil +} + +// ExtractCorrelatedCols implements base.LogicalPlan.<15th> interface. +func (la *LogicalAggregation) ExtractCorrelatedCols() []*expression.CorrelatedColumn { + corCols := make([]*expression.CorrelatedColumn, 0, len(la.GroupByItems)+len(la.AggFuncs)) + for _, expr := range la.GroupByItems { + corCols = append(corCols, expression.ExtractCorColumns(expr)...) + } + for _, fun := range la.AggFuncs { + for _, arg := range fun.Args { + corCols = append(corCols, expression.ExtractCorColumns(arg)...) + } + for _, arg := range fun.OrderByItems { + corCols = append(corCols, expression.ExtractCorColumns(arg.Expr)...) + } + } + return corCols +} + +// MaxOneRow inherits BaseLogicalPlan.LogicalPlan.<16th> implementation. + +// Children inherits BaseLogicalPlan.LogicalPlan.<17th> implementation. + +// SetChildren inherits BaseLogicalPlan.LogicalPlan.<18th> implementation. + +// SetChild inherits BaseLogicalPlan.LogicalPlan.<19th> implementation. + +// RollBackTaskMap inherits BaseLogicalPlan.LogicalPlan.<20th> implementation. + +// CanPushToCop implements base.LogicalPlan.<21st> interface. +func (la *LogicalAggregation) CanPushToCop(storeTp kv.StoreType) bool { + return la.BaseLogicalPlan.CanPushToCop(storeTp) && !la.NoCopPushDown +} + +// ExtractFD implements base.LogicalPlan.<22nd> interface. +// 1: +// In most of the cases, using FDs to check the only_full_group_by problem should be done in the buildAggregation phase +// by extracting the bottom-up FDs graph from the `p` --- the sub plan tree that has already been built. +// +// 2: +// and this requires that some conditions push-down into the `p` like selection should be done before building aggregation, +// otherwise, 'a=1 and a can occur in the select lists of a group by' will be miss-checked because it doesn't be implied in the known FDs graph. +// +// 3: +// when a logical agg is built, it's schema columns indicates what the permitted-non-agg columns is. Therefore, we shouldn't +// depend on logicalAgg.ExtractFD() to finish the only_full_group_by checking problem rather than by 1 & 2. +func (la *LogicalAggregation) ExtractFD() *fd.FDSet { + // basically extract the children's fdSet. + fds := la.LogicalSchemaProducer.ExtractFD() + // collect the output columns' unique ID. + outputColsUniqueIDs := intset.NewFastIntSet() + notnullColsUniqueIDs := intset.NewFastIntSet() + groupByColsUniqueIDs := intset.NewFastIntSet() + groupByColsOutputCols := intset.NewFastIntSet() + // Since the aggregation is build ahead of projection, the latter one will reuse the column with UniqueID allocated in aggregation + // via aggMapper, so we don't need unnecessarily maintain the mapping in the FDSet like expr did, just treating + // it as normal column. + for _, one := range la.Schema().Columns { + outputColsUniqueIDs.Insert(int(one.UniqueID)) + } + // For one like sum(a), we don't need to build functional dependency from a --> sum(a), cause it's only determined by the + // group-by-item (group-by-item --> sum(a)). + for _, expr := range la.GroupByItems { + switch x := expr.(type) { + case *expression.Column: + groupByColsUniqueIDs.Insert(int(x.UniqueID)) + case *expression.CorrelatedColumn: + // shouldn't be here, intercepted by plan builder as unknown column. + continue + case *expression.Constant: + // shouldn't be here, interpreted as pos param by plan builder. + continue + case *expression.ScalarFunction: + hashCode := string(x.HashCode()) + var ( + ok bool + scalarUniqueID int + ) + if scalarUniqueID, ok = fds.IsHashCodeRegistered(hashCode); ok { + groupByColsUniqueIDs.Insert(scalarUniqueID) + } else { + // retrieve unique plan column id. 1: completely new one, allocating new unique id. 2: registered by projection earlier, using it. + if scalarUniqueID, ok = la.SCtx().GetSessionVars().MapHashCode2UniqueID4ExtendedCol[hashCode]; !ok { + scalarUniqueID = int(la.SCtx().GetSessionVars().AllocPlanColumnID()) + } + fds.RegisterUniqueID(hashCode, scalarUniqueID) + groupByColsUniqueIDs.Insert(scalarUniqueID) + } + determinants := intset.NewFastIntSet() + extractedColumns := expression.ExtractColumns(x) + extractedCorColumns := expression.ExtractCorColumns(x) + for _, one := range extractedColumns { + determinants.Insert(int(one.UniqueID)) + groupByColsOutputCols.Insert(int(one.UniqueID)) + } + for _, one := range extractedCorColumns { + determinants.Insert(int(one.UniqueID)) + groupByColsOutputCols.Insert(int(one.UniqueID)) + } + notnull := util.IsNullRejected(la.SCtx(), la.Schema(), x) + if notnull || determinants.SubsetOf(fds.NotNullCols) { + notnullColsUniqueIDs.Insert(scalarUniqueID) + } + fds.AddStrictFunctionalDependency(determinants, intset.NewFastIntSet(scalarUniqueID)) + } + } + + // Some details: + // For now, select max(a) from t group by c, tidb will see `max(a)` as Max aggDes and `a,b,c` as firstRow aggDes, + // and keep them all in the schema columns before projection does the pruning. If we build the fake FD eg: {c} ~~> {b} + // here since we have seen b as firstRow aggDes, for the upper layer projection of `select max(a), b from t group by c`, + // it will take b as valid projection field of group by statement since it has existed in the FD with {c} ~~> {b}. + // + // and since any_value will NOT be pushed down to agg schema, which means every firstRow aggDes in the agg logical operator + // is meaningless to build the FD with. Let's only store the non-firstRow FD down: {group by items} ~~> {real aggDes} + realAggFuncUniqueID := intset.NewFastIntSet() + for i, aggDes := range la.AggFuncs { + if aggDes.Name != "firstrow" { + realAggFuncUniqueID.Insert(int(la.Schema().Columns[i].UniqueID)) + } + } + + // apply operator's characteristic's FD setting. + if len(la.GroupByItems) == 0 { + // 1: as the details shown above, output cols (normal column seen as firstrow) of group by are not validated. + // we couldn't merge them as constant FD with origin constant FD together before projection done. + // fds.MaxOneRow(outputColsUniqueIDs.Union(groupByColsOutputCols)) + // + // 2: for the convenience of later judgement, when there is no group by items, we will store a FD: {0} -> {real aggDes} + // 0 unique id is only used for here. + groupByColsUniqueIDs.Insert(0) + for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) { + fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i)) + } + } else { + // eliminating input columns that are un-projected. + fds.ProjectCols(outputColsUniqueIDs.Union(groupByColsOutputCols).Union(groupByColsUniqueIDs)) + + // note: {a} --> {b,c} is not same with {a} --> {b} and {a} --> {c} + for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) { + // group by phrase always produce strict FD. + // 1: it can always distinguish and group the all-null/part-null group column rows. + // 2: the rows with all/part null group column are unique row after group operation. + // 3: there won't be two same group key with different agg values, so strict FD secured. + fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i)) + } + + // agg funcDes has been tag not null flag when building aggregation. + fds.MakeNotNull(notnullColsUniqueIDs) + } + fds.GroupByCols = groupByColsUniqueIDs + fds.HasAggBuilt = true + // just trace it down in every operator for test checking. + la.SetFDs(fds) + return fds +} + +// GetBaseLogicalPlan inherits BaseLogicalPlan.LogicalPlan.<23rd> implementation. + +// ConvertOuterToInnerJoin inherits BaseLogicalPlan.LogicalPlan.<24th> implementation. + +// *************************** end implementation of logicalPlan interface *************************** + +// HasDistinct shows whether LogicalAggregation has functions with distinct. +func (la *LogicalAggregation) HasDistinct() bool { + for _, aggFunc := range la.AggFuncs { + if aggFunc.HasDistinct { + return true + } + } + return false +} + +// HasOrderBy shows whether LogicalAggregation has functions with order-by items. +func (la *LogicalAggregation) HasOrderBy() bool { + for _, aggFunc := range la.AggFuncs { + if len(aggFunc.OrderByItems) > 0 { + return true + } + } + return false +} + +// CopyAggHints copies the aggHints from another LogicalAggregation. +func (la *LogicalAggregation) CopyAggHints(agg *LogicalAggregation) { + // TODO: Copy the hint may make the un-applicable hint throw the + // same warning message more than once. We'd better add a flag for + // `HaveThrownWarningMessage` to avoid this. Besides, finalAgg and + // partialAgg (in cascades planner) should share the same hint, instead + // of a copy. + la.PreferAggType = agg.PreferAggType + la.PreferAggToCop = agg.PreferAggToCop +} + +// IsPartialModeAgg returns if all of the AggFuncs are partialMode. +func (la *LogicalAggregation) IsPartialModeAgg() bool { + // Since all of the AggFunc share the same AggMode, we only need to check the first one. + return la.AggFuncs[0].Mode == aggregation.Partial1Mode +} + +// IsCompleteModeAgg returns if all of the AggFuncs are CompleteMode. +func (la *LogicalAggregation) IsCompleteModeAgg() bool { + // Since all of the AggFunc share the same AggMode, we only need to check the first one. + return la.AggFuncs[0].Mode == aggregation.CompleteMode +} + +// GetGroupByCols returns the columns that are group-by items. +// For example, `group by a, b, c+d` will return [a, b]. +func (la *LogicalAggregation) GetGroupByCols() []*expression.Column { + groupByCols := make([]*expression.Column, 0, len(la.GroupByItems)) + for _, item := range la.GroupByItems { + if col, ok := item.(*expression.Column); ok { + groupByCols = append(groupByCols, col) + } + } + return groupByCols +} + +// GetPotentialPartitionKeys return potential partition keys for aggregation, the potential partition keys are the group by keys +func (la *LogicalAggregation) GetPotentialPartitionKeys() []*property.MPPPartitionColumn { + groupByCols := make([]*property.MPPPartitionColumn, 0, len(la.GroupByItems)) + for _, item := range la.GroupByItems { + if col, ok := item.(*expression.Column); ok { + groupByCols = append(groupByCols, &property.MPPPartitionColumn{ + Col: col, + CollateID: property.GetCollateIDByNameForPartition(col.GetStaticType().GetCollate()), + }) + } + } + return groupByCols +} + +// GetUsedCols extracts all of the Columns used by agg including GroupByItems and AggFuncs. +func (la *LogicalAggregation) GetUsedCols() (usedCols []*expression.Column) { + for _, groupByItem := range la.GroupByItems { + usedCols = append(usedCols, expression.ExtractColumns(groupByItem)...) + } + for _, aggDesc := range la.AggFuncs { + for _, expr := range aggDesc.Args { + usedCols = append(usedCols, expression.ExtractColumns(expr)...) + } + for _, expr := range aggDesc.OrderByItems { + usedCols = append(usedCols, expression.ExtractColumns(expr.Expr)...) + } + } + return usedCols +} + +// ResetHintIfConflicted resets the PreferAggType if they are conflicted, +// and returns the two PreferAggType hints. +func (la *LogicalAggregation) ResetHintIfConflicted() (preferHash bool, preferStream bool) { + preferHash = (la.PreferAggType & h.PreferHashAgg) > 0 + preferStream = (la.PreferAggType & h.PreferStreamAgg) > 0 + if preferHash && preferStream { + la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer aggregation hints are conflicted") + la.PreferAggType = 0 + preferHash, preferStream = false, false + } + return +} + +// DistinctArgsMeetsProperty checks if the distinct args meet the property. +func (la *LogicalAggregation) DistinctArgsMeetsProperty() bool { + for _, aggFunc := range la.AggFuncs { + if aggFunc.HasDistinct { + for _, distinctArg := range aggFunc.Args { + if !expression.Contains(la.GroupByItems, distinctArg) { + return false + } + } + } + } + return true +} + +// pushDownPredicatesForAggregation split a CNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation. +// It would consider the CNF. +// For example, +// (a > 1 or avg(b) > 1) and (a < 3), and `avg(b) > 1` can't be pushed-down. +// Then condsToPush: a < 3, ret: a > 1 or avg(b) > 1 +func (la *LogicalAggregation) pushDownCNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) { + var condsToPush []expression.Expression + var ret []expression.Expression + subCNFItem := expression.SplitCNFItems(cond) + if len(subCNFItem) == 1 { + return la.pushDownPredicatesForAggregation(subCNFItem[0], groupByColumns, exprsOriginal) + } + exprCtx := la.SCtx().GetExprCtx() + for _, item := range subCNFItem { + condsToPushForItem, retForItem := la.pushDownDNFPredicatesForAggregation(item, groupByColumns, exprsOriginal) + if len(condsToPushForItem) > 0 { + condsToPush = append(condsToPush, expression.ComposeDNFCondition(exprCtx, condsToPushForItem...)) + } + if len(retForItem) > 0 { + ret = append(ret, expression.ComposeDNFCondition(exprCtx, retForItem...)) + } + } + return condsToPush, ret +} + +// pushDownDNFPredicatesForAggregation split a DNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation. +// It would consider the DNF. +// For example, +// (a > 1 and avg(b) > 1) or (a < 3), and `avg(b) > 1` can't be pushed-down. +// Then condsToPush: (a < 3) and (a > 1), ret: (a > 1 and avg(b) > 1) or (a < 3) +func (la *LogicalAggregation) pushDownDNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) { + //nolint: prealloc + var condsToPush []expression.Expression + var ret []expression.Expression + subDNFItem := expression.SplitDNFItems(cond) + if len(subDNFItem) == 1 { + return la.pushDownPredicatesForAggregation(subDNFItem[0], groupByColumns, exprsOriginal) + } + exprCtx := la.SCtx().GetExprCtx() + for _, item := range subDNFItem { + condsToPushForItem, retForItem := la.pushDownCNFPredicatesForAggregation(item, groupByColumns, exprsOriginal) + if len(condsToPushForItem) <= 0 { + return nil, []expression.Expression{cond} + } + condsToPush = append(condsToPush, expression.ComposeCNFCondition(exprCtx, condsToPushForItem...)) + if len(retForItem) > 0 { + ret = append(ret, expression.ComposeCNFCondition(exprCtx, retForItem...)) + } + } + if len(ret) == 0 { + // All the condition can be pushed down. + return []expression.Expression{cond}, nil + } + dnfPushDownCond := expression.ComposeDNFCondition(exprCtx, condsToPush...) + // Some condition can't be pushed down, we need to keep all the condition. + return []expression.Expression{dnfPushDownCond}, []expression.Expression{cond} +} + +// splitCondForAggregation splits the condition into those who can be pushed and others. +func (la *LogicalAggregation) splitCondForAggregation(predicates []expression.Expression) ([]expression.Expression, []expression.Expression) { + var condsToPush []expression.Expression + var ret []expression.Expression + exprsOriginal := make([]expression.Expression, 0, len(la.AggFuncs)) + for _, fun := range la.AggFuncs { + exprsOriginal = append(exprsOriginal, fun.Args[0]) + } + groupByColumns := expression.NewSchema(la.GetGroupByCols()...) + // It's almost the same as pushDownCNFPredicatesForAggregation, except that the condition is a slice. + for _, cond := range predicates { + subCondsToPush, subRet := la.pushDownDNFPredicatesForAggregation(cond, groupByColumns, exprsOriginal) + if len(subCondsToPush) > 0 { + condsToPush = append(condsToPush, subCondsToPush...) + } + if len(subRet) > 0 { + ret = append(ret, subRet...) + } + } + return condsToPush, ret +} + +// pushDownPredicatesForAggregation split a condition to two parts, can be pushed-down or can not be pushed-down below aggregation. +func (la *LogicalAggregation) pushDownPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) { + var condsToPush []expression.Expression + var ret []expression.Expression + switch cond.(type) { + case *expression.Constant: + condsToPush = append(condsToPush, cond) + // Consider SQL list "select sum(b) from t group by a having 1=0". "1=0" is a constant predicate which should be + // retained and pushed down at the same time. Because we will get a wrong query result that contains one column + // with value 0 rather than an empty query result. + ret = append(ret, cond) + case *expression.ScalarFunction: + extractedCols := expression.ExtractColumns(cond) + ok := true + for _, col := range extractedCols { + if !groupByColumns.Contains(col) { + ok = false + break + } + } + if ok { + newFunc := expression.ColumnSubstitute(la.SCtx().GetExprCtx(), cond, la.Schema(), exprsOriginal) + condsToPush = append(condsToPush, newFunc) + } else { + ret = append(ret, cond) + } + default: + ret = append(ret, cond) + } + return condsToPush, ret +} + +func (la *LogicalAggregation) buildSelfKeyInfo(selfSchema *expression.Schema) { + groupByCols := la.GetGroupByCols() + if len(groupByCols) == len(la.GroupByItems) && len(la.GroupByItems) > 0 { + indices := selfSchema.ColumnsIndices(groupByCols) + if indices != nil { + newKey := make([]*expression.Column, 0, len(indices)) + for _, i := range indices { + newKey = append(newKey, selfSchema.Columns[i]) + } + selfSchema.Keys = append(selfSchema.Keys, newKey) + } + } + if len(la.GroupByItems) == 0 { + la.SetMaxOneRow(true) + } +} + +// canPullUp checks if an aggregation can be pulled up. An aggregate function like count(*) cannot be pulled up. +func (la *LogicalAggregation) canPullUp() bool { + if len(la.GroupByItems) > 0 { + return false + } + for _, f := range la.AggFuncs { + for _, arg := range f.Args { + expr := expression.EvaluateExprWithNull(la.SCtx().GetExprCtx(), la.Children()[0].Schema(), arg) + if con, ok := expr.(*expression.Constant); !ok || !con.Value.IsNull() { + return false + } + } + } + return true +} + +func (*LogicalAggregation) getGroupNDVs(colGroups [][]*expression.Column, childProfile *property.StatsInfo, gbyCols []*expression.Column) []property.GroupNDV { + if len(colGroups) == 0 { + return nil + } + // Check if the child profile provides GroupNDV for the GROUP BY columns. + // Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b, + // but we have no other approaches for the NDV estimation of these cases + // except for using the independent assumption, unless we can use stats of expression index. + groupNDV := childProfile.GetGroupNDV4Cols(gbyCols) + if groupNDV == nil { + return nil + } + return []property.GroupNDV{*groupNDV} +} diff --git a/pkg/planner/core/logical_initialize.go b/pkg/planner/core/logical_initialize.go new file mode 100644 index 0000000000..7fc62ed533 --- /dev/null +++ b/pkg/planner/core/logical_initialize.go @@ -0,0 +1,171 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "github.com/pingcap/tidb/pkg/planner/core/base" + "github.com/pingcap/tidb/pkg/planner/core/operator/logicalop" + "github.com/pingcap/tidb/pkg/util/plancodec" +) + +// Init initializes LogicalJoin. +func (p LogicalJoin) Init(ctx base.PlanContext, offset int) *LogicalJoin { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeJoin, &p, offset) + return &p +} + +// Init initializes DataSource. +func (ds DataSource) Init(ctx base.PlanContext, offset int) *DataSource { + ds.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDataSource, &ds, offset) + return &ds +} + +// Init initializes TiKVSingleGather. +func (sg TiKVSingleGather) Init(ctx base.PlanContext, offset int) *TiKVSingleGather { + sg.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTiKVSingleGather, &sg, offset) + return &sg +} + +// Init initializes LogicalTableScan. +func (ts LogicalTableScan) Init(ctx base.PlanContext, offset int) *LogicalTableScan { + ts.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTableScan, &ts, offset) + return &ts +} + +// Init initializes LogicalIndexScan. +func (is LogicalIndexScan) Init(ctx base.PlanContext, offset int) *LogicalIndexScan { + is.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeIdxScan, &is, offset) + return &is +} + +// Init initializes LogicalApply. +func (la LogicalApply) Init(ctx base.PlanContext, offset int) *LogicalApply { + la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeApply, &la, offset) + return &la +} + +// Init initializes LogicalSelection. +func (p LogicalSelection) Init(ctx base.PlanContext, qbOffset int) *LogicalSelection { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSel, &p, qbOffset) + return &p +} + +// Init initializes LogicalUnionScan. +func (p LogicalUnionScan) Init(ctx base.PlanContext, qbOffset int) *LogicalUnionScan { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnionScan, &p, qbOffset) + return &p +} + +// Init initializes LogicalProjection. +func (p LogicalProjection) Init(ctx base.PlanContext, qbOffset int) *LogicalProjection { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeProj, &p, qbOffset) + return &p +} + +// Init initializes LogicalProjection. +func (p LogicalExpand) Init(ctx base.PlanContext, offset int) *LogicalExpand { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeExpand, &p, offset) + return &p +} + +// Init initializes LogicalUnionAll. +func (p LogicalUnionAll) Init(ctx base.PlanContext, offset int) *LogicalUnionAll { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnion, &p, offset) + return &p +} + +// Init initializes LogicalPartitionUnionAll. +func (p LogicalPartitionUnionAll) Init(ctx base.PlanContext, offset int) *LogicalPartitionUnionAll { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypePartitionUnion, &p, offset) + return &p +} + +// Init initializes LogicalSort. +func (ls LogicalSort) Init(ctx base.PlanContext, offset int) *LogicalSort { + ls.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSort, &ls, offset) + return &ls +} + +// Init initializes LogicalTopN. +func (lt LogicalTopN) Init(ctx base.PlanContext, offset int) *LogicalTopN { + lt.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTopN, <, offset) + return < +} + +// Init initializes LogicalLimit. +func (p LogicalLimit) Init(ctx base.PlanContext, offset int) *LogicalLimit { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLimit, &p, offset) + return &p +} + +// Init initializes LogicalTableDual. +func (p LogicalTableDual) Init(ctx base.PlanContext, offset int) *LogicalTableDual { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDual, &p, offset) + return &p +} + +// Init initializes LogicalMaxOneRow. +func (p LogicalMaxOneRow) Init(ctx base.PlanContext, offset int) *LogicalMaxOneRow { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMaxOneRow, &p, offset) + return &p +} + +// Init initializes LogicalWindow. +func (p LogicalWindow) Init(ctx base.PlanContext, offset int) *LogicalWindow { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeWindow, &p, offset) + return &p +} + +// Init initializes LogicalShow. +func (p LogicalShow) Init(ctx base.PlanContext) *LogicalShow { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShow, &p, 0) + return &p +} + +// Init initializes LogicalShowDDLJobs. +func (p LogicalShowDDLJobs) Init(ctx base.PlanContext) *LogicalShowDDLJobs { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShowDDLJobs, &p, 0) + return &p +} + +// Init initializes LogicalLock. +func (p LogicalLock) Init(ctx base.PlanContext) *LogicalLock { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLock, &p, 0) + return &p +} + +// Init initializes LogicalMemTable. +func (p LogicalMemTable) Init(ctx base.PlanContext, offset int) *LogicalMemTable { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset) + return &p +} + +// Init only assigns type and context. +func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTE, &p, offset) + return &p +} + +// Init only assigns type and context. +func (p LogicalCTETable) Init(ctx base.PlanContext, offset int) *LogicalCTETable { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTETable, &p, offset) + return &p +} + +// Init initializes LogicalSequence +func (p LogicalSequence) Init(ctx base.PlanContext, offset int) *LogicalSequence { + p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSequence, &p, offset) + return &p +} diff --git a/pkg/planner/core/logical_plans.go b/pkg/planner/core/logical_plans.go index 81c3240ed9..12610f84e9 100644 --- a/pkg/planner/core/logical_plans.go +++ b/pkg/planner/core/logical_plans.go @@ -915,250 +915,6 @@ func (p *LogicalProjection) GetUsedCols() (usedCols []*expression.Column) { return usedCols } -// LogicalAggregation represents an aggregate plan. -type LogicalAggregation struct { - logicalop.LogicalSchemaProducer - - AggFuncs []*aggregation.AggFuncDesc - GroupByItems []expression.Expression - - // PreferAggType And PreferAggToCop stores aggregation hint information. - PreferAggType uint - PreferAggToCop bool - - PossibleProperties [][]*expression.Column - InputCount float64 // InputCount is the input count of this plan. - - // NoCopPushDown indicates if planner must not push this agg down to coprocessor. - // It is true when the agg is in the outer child tree of apply. - NoCopPushDown bool -} - -// HasDistinct shows whether LogicalAggregation has functions with distinct. -func (la *LogicalAggregation) HasDistinct() bool { - for _, aggFunc := range la.AggFuncs { - if aggFunc.HasDistinct { - return true - } - } - return false -} - -// HasOrderBy shows whether LogicalAggregation has functions with order-by items. -func (la *LogicalAggregation) HasOrderBy() bool { - for _, aggFunc := range la.AggFuncs { - if len(aggFunc.OrderByItems) > 0 { - return true - } - } - return false -} - -// ExtractFD implements the logical plan interface, extracting the FD from bottom up. -// 1: -// In most of the cases, using FDs to check the only_full_group_by problem should be done in the buildAggregation phase -// by extracting the bottom-up FDs graph from the `p` --- the sub plan tree that has already been built. -// -// 2: -// and this requires that some conditions push-down into the `p` like selection should be done before building aggregation, -// otherwise, 'a=1 and a can occur in the select lists of a group by' will be miss-checked because it doesn't be implied in the known FDs graph. -// -// 3: -// when a logical agg is built, it's schema columns indicates what the permitted-non-agg columns is. Therefore, we shouldn't -// depend on logicalAgg.ExtractFD() to finish the only_full_group_by checking problem rather than by 1 & 2. -func (la *LogicalAggregation) ExtractFD() *fd.FDSet { - // basically extract the children's fdSet. - fds := la.LogicalSchemaProducer.ExtractFD() - // collect the output columns' unique ID. - outputColsUniqueIDs := intset.NewFastIntSet() - notnullColsUniqueIDs := intset.NewFastIntSet() - groupByColsUniqueIDs := intset.NewFastIntSet() - groupByColsOutputCols := intset.NewFastIntSet() - // Since the aggregation is build ahead of projection, the latter one will reuse the column with UniqueID allocated in aggregation - // via aggMapper, so we don't need unnecessarily maintain the mapping in the FDSet like expr did, just treating - // it as normal column. - for _, one := range la.Schema().Columns { - outputColsUniqueIDs.Insert(int(one.UniqueID)) - } - // For one like sum(a), we don't need to build functional dependency from a --> sum(a), cause it's only determined by the - // group-by-item (group-by-item --> sum(a)). - for _, expr := range la.GroupByItems { - switch x := expr.(type) { - case *expression.Column: - groupByColsUniqueIDs.Insert(int(x.UniqueID)) - case *expression.CorrelatedColumn: - // shouldn't be here, intercepted by plan builder as unknown column. - continue - case *expression.Constant: - // shouldn't be here, interpreted as pos param by plan builder. - continue - case *expression.ScalarFunction: - hashCode := string(x.HashCode()) - var ( - ok bool - scalarUniqueID int - ) - if scalarUniqueID, ok = fds.IsHashCodeRegistered(hashCode); ok { - groupByColsUniqueIDs.Insert(scalarUniqueID) - } else { - // retrieve unique plan column id. 1: completely new one, allocating new unique id. 2: registered by projection earlier, using it. - if scalarUniqueID, ok = la.SCtx().GetSessionVars().MapHashCode2UniqueID4ExtendedCol[hashCode]; !ok { - scalarUniqueID = int(la.SCtx().GetSessionVars().AllocPlanColumnID()) - } - fds.RegisterUniqueID(hashCode, scalarUniqueID) - groupByColsUniqueIDs.Insert(scalarUniqueID) - } - determinants := intset.NewFastIntSet() - extractedColumns := expression.ExtractColumns(x) - extractedCorColumns := expression.ExtractCorColumns(x) - for _, one := range extractedColumns { - determinants.Insert(int(one.UniqueID)) - groupByColsOutputCols.Insert(int(one.UniqueID)) - } - for _, one := range extractedCorColumns { - determinants.Insert(int(one.UniqueID)) - groupByColsOutputCols.Insert(int(one.UniqueID)) - } - notnull := util.IsNullRejected(la.SCtx(), la.Schema(), x) - if notnull || determinants.SubsetOf(fds.NotNullCols) { - notnullColsUniqueIDs.Insert(scalarUniqueID) - } - fds.AddStrictFunctionalDependency(determinants, intset.NewFastIntSet(scalarUniqueID)) - } - } - - // Some details: - // For now, select max(a) from t group by c, tidb will see `max(a)` as Max aggDes and `a,b,c` as firstRow aggDes, - // and keep them all in the schema columns before projection does the pruning. If we build the fake FD eg: {c} ~~> {b} - // here since we have seen b as firstRow aggDes, for the upper layer projection of `select max(a), b from t group by c`, - // it will take b as valid projection field of group by statement since it has existed in the FD with {c} ~~> {b}. - // - // and since any_value will NOT be pushed down to agg schema, which means every firstRow aggDes in the agg logical operator - // is meaningless to build the FD with. Let's only store the non-firstRow FD down: {group by items} ~~> {real aggDes} - realAggFuncUniqueID := intset.NewFastIntSet() - for i, aggDes := range la.AggFuncs { - if aggDes.Name != "firstrow" { - realAggFuncUniqueID.Insert(int(la.Schema().Columns[i].UniqueID)) - } - } - - // apply operator's characteristic's FD setting. - if len(la.GroupByItems) == 0 { - // 1: as the details shown above, output cols (normal column seen as firstrow) of group by are not validated. - // we couldn't merge them as constant FD with origin constant FD together before projection done. - // fds.MaxOneRow(outputColsUniqueIDs.Union(groupByColsOutputCols)) - // - // 2: for the convenience of later judgement, when there is no group by items, we will store a FD: {0} -> {real aggDes} - // 0 unique id is only used for here. - groupByColsUniqueIDs.Insert(0) - for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) { - fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i)) - } - } else { - // eliminating input columns that are un-projected. - fds.ProjectCols(outputColsUniqueIDs.Union(groupByColsOutputCols).Union(groupByColsUniqueIDs)) - - // note: {a} --> {b,c} is not same with {a} --> {b} and {a} --> {c} - for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) { - // group by phrase always produce strict FD. - // 1: it can always distinguish and group the all-null/part-null group column rows. - // 2: the rows with all/part null group column are unique row after group operation. - // 3: there won't be two same group key with different agg values, so strict FD secured. - fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i)) - } - - // agg funcDes has been tag not null flag when building aggregation. - fds.MakeNotNull(notnullColsUniqueIDs) - } - fds.GroupByCols = groupByColsUniqueIDs - fds.HasAggBuilt = true - // just trace it down in every operator for test checking. - la.SetFDs(fds) - return fds -} - -// CopyAggHints copies the aggHints from another LogicalAggregation. -func (la *LogicalAggregation) CopyAggHints(agg *LogicalAggregation) { - // TODO: Copy the hint may make the un-applicable hint throw the - // same warning message more than once. We'd better add a flag for - // `HaveThrownWarningMessage` to avoid this. Besides, finalAgg and - // partialAgg (in cascades planner) should share the same hint, instead - // of a copy. - la.PreferAggType = agg.PreferAggType - la.PreferAggToCop = agg.PreferAggToCop -} - -// IsPartialModeAgg returns if all of the AggFuncs are partialMode. -func (la *LogicalAggregation) IsPartialModeAgg() bool { - // Since all of the AggFunc share the same AggMode, we only need to check the first one. - return la.AggFuncs[0].Mode == aggregation.Partial1Mode -} - -// IsCompleteModeAgg returns if all of the AggFuncs are CompleteMode. -func (la *LogicalAggregation) IsCompleteModeAgg() bool { - // Since all of the AggFunc share the same AggMode, we only need to check the first one. - return la.AggFuncs[0].Mode == aggregation.CompleteMode -} - -// GetGroupByCols returns the columns that are group-by items. -// For example, `group by a, b, c+d` will return [a, b]. -func (la *LogicalAggregation) GetGroupByCols() []*expression.Column { - groupByCols := make([]*expression.Column, 0, len(la.GroupByItems)) - for _, item := range la.GroupByItems { - if col, ok := item.(*expression.Column); ok { - groupByCols = append(groupByCols, col) - } - } - return groupByCols -} - -// GetPotentialPartitionKeys return potential partition keys for aggregation, the potential partition keys are the group by keys -func (la *LogicalAggregation) GetPotentialPartitionKeys() []*property.MPPPartitionColumn { - groupByCols := make([]*property.MPPPartitionColumn, 0, len(la.GroupByItems)) - for _, item := range la.GroupByItems { - if col, ok := item.(*expression.Column); ok { - groupByCols = append(groupByCols, &property.MPPPartitionColumn{ - Col: col, - CollateID: property.GetCollateIDByNameForPartition(col.GetStaticType().GetCollate()), - }) - } - } - return groupByCols -} - -// ExtractCorrelatedCols implements LogicalPlan interface. -func (la *LogicalAggregation) ExtractCorrelatedCols() []*expression.CorrelatedColumn { - corCols := make([]*expression.CorrelatedColumn, 0, len(la.GroupByItems)+len(la.AggFuncs)) - for _, expr := range la.GroupByItems { - corCols = append(corCols, expression.ExtractCorColumns(expr)...) - } - for _, fun := range la.AggFuncs { - for _, arg := range fun.Args { - corCols = append(corCols, expression.ExtractCorColumns(arg)...) - } - for _, arg := range fun.OrderByItems { - corCols = append(corCols, expression.ExtractCorColumns(arg.Expr)...) - } - } - return corCols -} - -// GetUsedCols extracts all of the Columns used by agg including GroupByItems and AggFuncs. -func (la *LogicalAggregation) GetUsedCols() (usedCols []*expression.Column) { - for _, groupByItem := range la.GroupByItems { - usedCols = append(usedCols, expression.ExtractColumns(groupByItem)...) - } - for _, aggDesc := range la.AggFuncs { - for _, expr := range aggDesc.Args { - usedCols = append(usedCols, expression.ExtractColumns(expr)...) - } - for _, expr := range aggDesc.OrderByItems { - usedCols = append(usedCols, expression.ExtractColumns(expr.Expr)...) - } - } - return usedCols -} - // LogicalSelection represents a where or having predicate. type LogicalSelection struct { logicalop.BaseLogicalPlan diff --git a/pkg/planner/core/property_cols_prune.go b/pkg/planner/core/property_cols_prune.go index b6ca4b5e10..41df48dc74 100644 --- a/pkg/planner/core/property_cols_prune.go +++ b/pkg/planner/core/property_cols_prune.go @@ -186,25 +186,3 @@ func (p *LogicalJoin) PreparePossibleProperties(_ *expression.Schema, childrenPr } return resultProperties } - -// PreparePossibleProperties implements base.LogicalPlan PreparePossibleProperties interface. -func (la *LogicalAggregation) PreparePossibleProperties(_ *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column { - childProps := childrenProperties[0] - // If there's no group-by item, the stream aggregation could have no order property. So we can add an empty property - // when its group-by item is empty. - if len(la.GroupByItems) == 0 { - la.PossibleProperties = [][]*expression.Column{nil} - return nil - } - resultProperties := make([][]*expression.Column, 0, len(childProps)) - groupByCols := la.GetGroupByCols() - for _, possibleChildProperty := range childProps { - sortColOffsets := getMaxSortPrefix(possibleChildProperty, groupByCols) - if len(sortColOffsets) == len(groupByCols) { - prop := possibleChildProperty[:len(groupByCols)] - resultProperties = append(resultProperties, prop) - } - } - la.PossibleProperties = resultProperties - return resultProperties -} diff --git a/pkg/planner/core/rule_aggregation_push_down.go b/pkg/planner/core/rule_aggregation_push_down.go index 28557580f1..44fc56b812 100644 --- a/pkg/planner/core/rule_aggregation_push_down.go +++ b/pkg/planner/core/rule_aggregation_push_down.go @@ -565,7 +565,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz break } newGbyItems = append(newGbyItems, groupBy) - if ExprsHasSideEffects(newGbyItems) { + if expression.ExprsHasSideEffects(newGbyItems) { noSideEffects = false break } @@ -586,7 +586,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz } newArgs = append(newArgs, newArg) } - if ExprsHasSideEffects(newArgs) { + if expression.ExprsHasSideEffects(newArgs) { noSideEffects = false break } @@ -603,7 +603,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz } newOrderByItems = append(newOrderByItems, byItem) } - if ExprsHasSideEffects(newOrderByItems) { + if expression.ExprsHasSideEffects(newOrderByItems) { noSideEffects = false break } diff --git a/pkg/planner/core/rule_build_key_info.go b/pkg/planner/core/rule_build_key_info.go index ce69b0e741..461551cab8 100644 --- a/pkg/planner/core/rule_build_key_info.go +++ b/pkg/planner/core/rule_build_key_info.go @@ -45,36 +45,6 @@ func buildKeyInfo(lp base.LogicalPlan) { lp.BuildKeyInfo(lp.Schema(), childSchema) } -// BuildKeyInfo implements base.LogicalPlan BuildKeyInfo interface. -func (la *LogicalAggregation) BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema) { - // According to the issue#46962, we can ignore the judgment of partial agg - // Sometimes, the agg inside of subquery and there is a true condition in where clause, the agg function is empty. - // For example, ``` select xxxx from xxx WHERE TRUE = ALL ( SELECT TRUE GROUP BY 1 LIMIT 1 ) IS NULL IS NOT NULL; - // In this case, the agg is complete mode and we can ignore this check. - if len(la.AggFuncs) != 0 && la.IsPartialModeAgg() { - return - } - la.LogicalSchemaProducer.BuildKeyInfo(selfSchema, childSchema) - la.buildSelfKeyInfo(selfSchema) -} - -func (la *LogicalAggregation) buildSelfKeyInfo(selfSchema *expression.Schema) { - groupByCols := la.GetGroupByCols() - if len(groupByCols) == len(la.GroupByItems) && len(la.GroupByItems) > 0 { - indices := selfSchema.ColumnsIndices(groupByCols) - if indices != nil { - newKey := make([]*expression.Column, 0, len(indices)) - for _, i := range indices { - newKey = append(newKey, selfSchema.Columns[i]) - } - selfSchema.Keys = append(selfSchema.Keys, newKey) - } - } - if len(la.GroupByItems) == 0 { - la.SetMaxOneRow(true) - } -} - // If a condition is the form of (uniqueKey = constant) or (uniqueKey = Correlated column), it returns at most one row. // This function will check it. func checkMaxOneRowCond(eqColIDs map[int64]struct{}, childSchema *expression.Schema) bool { diff --git a/pkg/planner/core/rule_column_pruning.go b/pkg/planner/core/rule_column_pruning.go index 91b81ed0ec..f2e3556e63 100644 --- a/pkg/planner/core/rule_column_pruning.go +++ b/pkg/planner/core/rule_column_pruning.go @@ -19,9 +19,7 @@ import ( "slices" "github.com/pingcap/tidb/pkg/expression" - "github.com/pingcap/tidb/pkg/expression/aggregation" "github.com/pingcap/tidb/pkg/infoschema" - "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/core/base" @@ -44,33 +42,6 @@ func (*columnPruner) optimize(_ context.Context, lp base.LogicalPlan, opt *optim return lp, planChanged, nil } -// ExprsHasSideEffects checks if any of the expressions has side effects. -func ExprsHasSideEffects(exprs []expression.Expression) bool { - for _, expr := range exprs { - if exprHasSetVarOrSleep(expr) { - return true - } - } - return false -} - -// exprHasSetVarOrSleep checks if the expression has SetVar function or Sleep function. -func exprHasSetVarOrSleep(expr expression.Expression) bool { - scalaFunc, isScalaFunc := expr.(*expression.ScalarFunction) - if !isScalaFunc { - return false - } - if scalaFunc.FuncName.L == ast.SetVar || scalaFunc.FuncName.L == ast.Sleep { - return true - } - for _, arg := range scalaFunc.GetArgs() { - if exprHasSetVarOrSleep(arg) { - return true - } - } - return false -} - // PruneColumns implement the Expand OP's column pruning logic. // logicExpand is built in the logical plan building phase, where all the column prune is not done yet. So the // expand projection expressions is meaningless if it built at that time. (we only maintain its schema, while @@ -108,7 +79,7 @@ func (p *LogicalProjection) PruneColumns(parentUsedCols []*expression.Column, op // for implicit projected cols, once the ancestor doesn't use it, the implicit expr will be automatically pruned here. for i := len(used) - 1; i >= 0; i-- { - if !used[i] && !exprHasSetVarOrSleep(p.Exprs[i]) { + if !used[i] && !expression.ExprHasSetVarOrSleep(p.Exprs[i]) { prunedColumns = append(prunedColumns, p.Schema().Columns[i]) p.Schema().Columns = append(p.Schema().Columns[:i], p.Schema().Columns[i+1:]...) p.Exprs = append(p.Exprs[:i], p.Exprs[i+1:]...) @@ -138,100 +109,6 @@ func (p *LogicalSelection) PruneColumns(parentUsedCols []*expression.Column, opt return p, nil } -// PruneColumns implements base.LogicalPlan interface. -func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) { - child := la.Children()[0] - used := expression.GetUsedList(la.SCtx().GetExprCtx().GetEvalCtx(), parentUsedCols, la.Schema()) - prunedColumns := make([]*expression.Column, 0) - prunedFunctions := make([]*aggregation.AggFuncDesc, 0) - prunedGroupByItems := make([]expression.Expression, 0) - - allFirstRow := true - allRemainFirstRow := true - for i := len(used) - 1; i >= 0; i-- { - if la.AggFuncs[i].Name != ast.AggFuncFirstRow { - allFirstRow = false - } - if !used[i] && !ExprsHasSideEffects(la.AggFuncs[i].Args) { - prunedColumns = append(prunedColumns, la.Schema().Columns[i]) - prunedFunctions = append(prunedFunctions, la.AggFuncs[i]) - la.Schema().Columns = append(la.Schema().Columns[:i], la.Schema().Columns[i+1:]...) - la.AggFuncs = append(la.AggFuncs[:i], la.AggFuncs[i+1:]...) - } else if la.AggFuncs[i].Name != ast.AggFuncFirstRow { - allRemainFirstRow = false - } - } - logicaltrace.AppendColumnPruneTraceStep(la, prunedColumns, opt) - logicaltrace.AppendFunctionPruneTraceStep(la, prunedFunctions, opt) - //nolint: prealloc - var selfUsedCols []*expression.Column - for _, aggrFunc := range la.AggFuncs { - selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil) - - var cols []*expression.Column - aggrFunc.OrderByItems, cols = pruneByItems(la, aggrFunc.OrderByItems, opt) - selfUsedCols = append(selfUsedCols, cols...) - } - if len(la.AggFuncs) == 0 || (!allFirstRow && allRemainFirstRow) { - // If all the aggregate functions are pruned, we should add an aggregate function to maintain the info of row numbers. - // For all the aggregate functions except `first_row`, if we have an empty table defined as t(a,b), - // `select agg(a) from t` would always return one row, while `select agg(a) from t group by b` would return empty. - // For `first_row` which is only used internally by tidb, `first_row(a)` would always return empty for empty input now. - var err error - var newAgg *aggregation.AggFuncDesc - if allFirstRow { - newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncFirstRow, []expression.Expression{expression.NewOne()}, false) - } else { - newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncCount, []expression.Expression{expression.NewOne()}, false) - } - if err != nil { - return nil, err - } - la.AggFuncs = append(la.AggFuncs, newAgg) - col := &expression.Column{ - UniqueID: la.SCtx().GetSessionVars().AllocPlanColumnID(), - RetType: newAgg.RetTp, - } - la.Schema().Columns = append(la.Schema().Columns, col) - } - - if len(la.GroupByItems) > 0 { - for i := len(la.GroupByItems) - 1; i >= 0; i-- { - cols := expression.ExtractColumns(la.GroupByItems[i]) - if len(cols) == 0 && !exprHasSetVarOrSleep(la.GroupByItems[i]) { - prunedGroupByItems = append(prunedGroupByItems, la.GroupByItems[i]) - la.GroupByItems = append(la.GroupByItems[:i], la.GroupByItems[i+1:]...) - } else { - selfUsedCols = append(selfUsedCols, cols...) - } - } - // If all the group by items are pruned, we should add a constant 1 to keep the correctness. - // Because `select count(*) from t` is different from `select count(*) from t group by 1`. - if len(la.GroupByItems) == 0 { - la.GroupByItems = []expression.Expression{expression.NewOne()} - } - } - logicaltrace.AppendGroupByItemsPruneTraceStep(la, prunedGroupByItems, opt) - var err error - la.Children()[0], err = child.PruneColumns(selfUsedCols, opt) - if err != nil { - return nil, err - } - // update children[0] - child = la.Children()[0] - // Do an extra Projection Elimination here. This is specially for empty Projection below Aggregation. - // This kind of Projection would cause some bugs for MPP plan and is safe to be removed. - // This kind of Projection should be removed in Projection Elimination, but currently PrunColumnsAgain is - // the last rule. So we specially handle this case here. - if childProjection, isProjection := child.(*LogicalProjection); isProjection { - if len(childProjection.Exprs) == 0 && childProjection.Schema().Len() == 0 { - childOfChild := childProjection.Children()[0] - la.SetChildren(childOfChild) - } - } - return la, nil -} - func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (byItems []*util.ByItems, parentUsedCols []*expression.Column) { prunedByItems := make([]*util.ByItems, 0) diff --git a/pkg/planner/core/rule_decorrelate.go b/pkg/planner/core/rule_decorrelate.go index d1546a6100..3cd29c8807 100644 --- a/pkg/planner/core/rule_decorrelate.go +++ b/pkg/planner/core/rule_decorrelate.go @@ -42,22 +42,6 @@ func (la *LogicalApply) canPullUpAgg() bool { return len(la.Children()[0].Schema().Keys) > 0 } -// canPullUp checks if an aggregation can be pulled up. An aggregate function like count(*) cannot be pulled up. -func (la *LogicalAggregation) canPullUp() bool { - if len(la.GroupByItems) > 0 { - return false - } - for _, f := range la.AggFuncs { - for _, arg := range f.Args { - expr := expression.EvaluateExprWithNull(la.SCtx().GetExprCtx(), la.Children()[0].Schema(), arg) - if con, ok := expr.(*expression.Constant); !ok || !con.Value.IsNull() { - return false - } - } - } - return true -} - // deCorColFromEqExpr checks whether it's an equal condition of form `col = correlated col`. If so we will change the decorrelated // column to normal column to make a new equal condition. func (la *LogicalApply) deCorColFromEqExpr(expr expression.Expression) expression.Expression { diff --git a/pkg/planner/core/rule_eliminate_projection.go b/pkg/planner/core/rule_eliminate_projection.go index 66de4b07b3..ac813ae51d 100644 --- a/pkg/planner/core/rule_eliminate_projection.go +++ b/pkg/planner/core/rule_eliminate_projection.go @@ -212,7 +212,7 @@ func (pe *projectionEliminator) eliminate(p base.LogicalPlan, replace map[string // eliminate duplicate projection: projection with child projection if isProj { - if child, ok := p.Children()[0].(*LogicalProjection); ok && !ExprsHasSideEffects(child.Exprs) { + if child, ok := p.Children()[0].(*LogicalProjection); ok && !expression.ExprsHasSideEffects(child.Exprs) { ctx := p.SCtx() for i := range proj.Exprs { proj.Exprs[i] = ReplaceColumnOfExpr(proj.Exprs[i], child, child.Schema()) @@ -276,21 +276,6 @@ func (p *LogicalProjection) ReplaceExprColumns(replace map[string]*expression.Co } } -// ReplaceExprColumns implements base.LogicalPlan interface. -func (la *LogicalAggregation) ReplaceExprColumns(replace map[string]*expression.Column) { - for _, agg := range la.AggFuncs { - for _, aggExpr := range agg.Args { - ResolveExprAndReplace(aggExpr, replace) - } - for _, orderExpr := range agg.OrderByItems { - ResolveExprAndReplace(orderExpr.Expr, replace) - } - } - for _, gbyItem := range la.GroupByItems { - ResolveExprAndReplace(gbyItem, replace) - } -} - // ReplaceExprColumns implements base.LogicalPlan interface. func (p *LogicalSelection) ReplaceExprColumns(replace map[string]*expression.Column) { for _, expr := range p.Conditions { diff --git a/pkg/planner/core/rule_predicate_push_down.go b/pkg/planner/core/rule_predicate_push_down.go index 2be5571eef..034e2bd91a 100644 --- a/pkg/planner/core/rule_predicate_push_down.go +++ b/pkg/planner/core/rule_predicate_push_down.go @@ -431,125 +431,6 @@ func (p *LogicalUnionAll) PredicatePushDown(predicates []expression.Expression, return nil, p } -// pushDownPredicatesForAggregation split a condition to two parts, can be pushed-down or can not be pushed-down below aggregation. -func (la *LogicalAggregation) pushDownPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) { - var condsToPush []expression.Expression - var ret []expression.Expression - switch cond.(type) { - case *expression.Constant: - condsToPush = append(condsToPush, cond) - // Consider SQL list "select sum(b) from t group by a having 1=0". "1=0" is a constant predicate which should be - // retained and pushed down at the same time. Because we will get a wrong query result that contains one column - // with value 0 rather than an empty query result. - ret = append(ret, cond) - case *expression.ScalarFunction: - extractedCols := expression.ExtractColumns(cond) - ok := true - for _, col := range extractedCols { - if !groupByColumns.Contains(col) { - ok = false - break - } - } - if ok { - newFunc := expression.ColumnSubstitute(la.SCtx().GetExprCtx(), cond, la.Schema(), exprsOriginal) - condsToPush = append(condsToPush, newFunc) - } else { - ret = append(ret, cond) - } - default: - ret = append(ret, cond) - } - return condsToPush, ret -} - -// pushDownPredicatesForAggregation split a CNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation. -// It would consider the CNF. -// For example, -// (a > 1 or avg(b) > 1) and (a < 3), and `avg(b) > 1` can't be pushed-down. -// Then condsToPush: a < 3, ret: a > 1 or avg(b) > 1 -func (la *LogicalAggregation) pushDownCNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) { - var condsToPush []expression.Expression - var ret []expression.Expression - subCNFItem := expression.SplitCNFItems(cond) - if len(subCNFItem) == 1 { - return la.pushDownPredicatesForAggregation(subCNFItem[0], groupByColumns, exprsOriginal) - } - exprCtx := la.SCtx().GetExprCtx() - for _, item := range subCNFItem { - condsToPushForItem, retForItem := la.pushDownDNFPredicatesForAggregation(item, groupByColumns, exprsOriginal) - if len(condsToPushForItem) > 0 { - condsToPush = append(condsToPush, expression.ComposeDNFCondition(exprCtx, condsToPushForItem...)) - } - if len(retForItem) > 0 { - ret = append(ret, expression.ComposeDNFCondition(exprCtx, retForItem...)) - } - } - return condsToPush, ret -} - -// pushDownDNFPredicatesForAggregation split a DNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation. -// It would consider the DNF. -// For example, -// (a > 1 and avg(b) > 1) or (a < 3), and `avg(b) > 1` can't be pushed-down. -// Then condsToPush: (a < 3) and (a > 1), ret: (a > 1 and avg(b) > 1) or (a < 3) -func (la *LogicalAggregation) pushDownDNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) { - //nolint: prealloc - var condsToPush []expression.Expression - var ret []expression.Expression - subDNFItem := expression.SplitDNFItems(cond) - if len(subDNFItem) == 1 { - return la.pushDownPredicatesForAggregation(subDNFItem[0], groupByColumns, exprsOriginal) - } - exprCtx := la.SCtx().GetExprCtx() - for _, item := range subDNFItem { - condsToPushForItem, retForItem := la.pushDownCNFPredicatesForAggregation(item, groupByColumns, exprsOriginal) - if len(condsToPushForItem) <= 0 { - return nil, []expression.Expression{cond} - } - condsToPush = append(condsToPush, expression.ComposeCNFCondition(exprCtx, condsToPushForItem...)) - if len(retForItem) > 0 { - ret = append(ret, expression.ComposeCNFCondition(exprCtx, retForItem...)) - } - } - if len(ret) == 0 { - // All the condition can be pushed down. - return []expression.Expression{cond}, nil - } - dnfPushDownCond := expression.ComposeDNFCondition(exprCtx, condsToPush...) - // Some condition can't be pushed down, we need to keep all the condition. - return []expression.Expression{dnfPushDownCond}, []expression.Expression{cond} -} - -// splitCondForAggregation splits the condition into those who can be pushed and others. -func (la *LogicalAggregation) splitCondForAggregation(predicates []expression.Expression) ([]expression.Expression, []expression.Expression) { - var condsToPush []expression.Expression - var ret []expression.Expression - exprsOriginal := make([]expression.Expression, 0, len(la.AggFuncs)) - for _, fun := range la.AggFuncs { - exprsOriginal = append(exprsOriginal, fun.Args[0]) - } - groupByColumns := expression.NewSchema(la.GetGroupByCols()...) - // It's almost the same as pushDownCNFPredicatesForAggregation, except that the condition is a slice. - for _, cond := range predicates { - subCondsToPush, subRet := la.pushDownDNFPredicatesForAggregation(cond, groupByColumns, exprsOriginal) - if len(subCondsToPush) > 0 { - condsToPush = append(condsToPush, subCondsToPush...) - } - if len(subRet) > 0 { - ret = append(ret, subRet...) - } - } - return condsToPush, ret -} - -// PredicatePushDown implements base.LogicalPlan PredicatePushDown interface. -func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) { - condsToPush, ret := la.splitCondForAggregation(predicates) - la.BaseLogicalPlan.PredicatePushDown(condsToPush, opt) - return ret, la -} - // PredicatePushDown implements base.LogicalPlan PredicatePushDown interface. func (p *LogicalLimit) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) { // Limit forbids any condition to push down. diff --git a/pkg/planner/core/stats.go b/pkg/planner/core/stats.go index 55cd94009f..b70ca0e0af 100644 --- a/pkg/planner/core/stats.go +++ b/pkg/planner/core/stats.go @@ -685,66 +685,6 @@ func (p *LogicalProjection) ExtractColGroups(colGroups [][]*expression.Column) [ return extracted } -func (*LogicalAggregation) getGroupNDVs(colGroups [][]*expression.Column, childProfile *property.StatsInfo, gbyCols []*expression.Column) []property.GroupNDV { - if len(colGroups) == 0 { - return nil - } - // Check if the child profile provides GroupNDV for the GROUP BY columns. - // Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b, - // but we have no other approaches for the NDV estimation of these cases - // except for using the independent assumption, unless we can use stats of expression index. - groupNDV := childProfile.GetGroupNDV4Cols(gbyCols) - if groupNDV == nil { - return nil - } - return []property.GroupNDV{*groupNDV} -} - -// DeriveStats implement LogicalPlan DeriveStats interface. -func (la *LogicalAggregation) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error) { - childProfile := childStats[0] - gbyCols := make([]*expression.Column, 0, len(la.GroupByItems)) - for _, gbyExpr := range la.GroupByItems { - cols := expression.ExtractColumns(gbyExpr) - gbyCols = append(gbyCols, cols...) - } - if la.StatsInfo() != nil { - // Reload GroupNDVs since colGroups may have changed. - la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols) - return la.StatsInfo(), nil - } - ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(gbyCols, childSchema[0], childProfile) - la.SetStats(&property.StatsInfo{ - RowCount: ndv, - ColNDVs: make(map[int64]float64, selfSchema.Len()), - }) - // We cannot estimate the ColNDVs for every output, so we use a conservative strategy. - for _, col := range selfSchema.Columns { - la.StatsInfo().ColNDVs[col.UniqueID] = ndv - } - la.InputCount = childProfile.RowCount - la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols) - return la.StatsInfo(), nil -} - -// ExtractColGroups implements LogicalPlan ExtractColGroups interface. -func (la *LogicalAggregation) ExtractColGroups(_ [][]*expression.Column) [][]*expression.Column { - // Parent colGroups would be dicarded, because aggregation would make NDV of colGroups - // which does not match GroupByItems invalid. - // Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b, - // but we have no other approaches for the NDV estimation of these cases - // except for using the independent assumption, unless we can use stats of expression index. - gbyCols := make([]*expression.Column, 0, len(la.GroupByItems)) - for _, gbyExpr := range la.GroupByItems { - cols := expression.ExtractColumns(gbyExpr) - gbyCols = append(gbyCols, cols...) - } - if len(gbyCols) > 1 { - return [][]*expression.Column{expression.SortColumns(gbyCols)} - } - return nil -} - func (p *LogicalJoin) getGroupNDVs(colGroups [][]*expression.Column, childStats []*property.StatsInfo) []property.GroupNDV { outerIdx := int(-1) if p.JoinType == LeftOuterJoin || p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin { diff --git a/pkg/planner/util/misc.go b/pkg/planner/util/misc.go index 3589a764ff..36e5c22282 100644 --- a/pkg/planner/util/misc.go +++ b/pkg/planner/util/misc.go @@ -96,3 +96,17 @@ func EncodeIntAsUint32(result []byte, value int) []byte { binary.BigEndian.PutUint32(buf[:], uint32(value)) return append(result, buf[:]...) } + +// GetMaxSortPrefix returns the prefix offset of sortCols in allCols. +func GetMaxSortPrefix(sortCols, allCols []*expression.Column) []int { + tmpSchema := expression.NewSchema(allCols...) + sortColOffsets := make([]int, 0, len(sortCols)) + for _, sortCol := range sortCols { + offset := tmpSchema.ColumnIndex(sortCol) + if offset == -1 { + return sortColOffsets + } + sortColOffsets = append(sortColOffsets, offset) + } + return sortColOffsets +} diff --git a/pkg/planner/util/utilfuncp/BUILD.bazel b/pkg/planner/util/utilfuncp/BUILD.bazel index bd32e75d1e..070064ce1c 100644 --- a/pkg/planner/util/utilfuncp/BUILD.bazel +++ b/pkg/planner/util/utilfuncp/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//pkg/kv", "//pkg/planner/core/base", "//pkg/planner/property", + "//pkg/planner/util", "//pkg/planner/util/optimizetrace", ], ) diff --git a/pkg/planner/util/utilfuncp/func_pointer_misc.go b/pkg/planner/util/utilfuncp/func_pointer_misc.go index 06d8efd652..38b99a613c 100644 --- a/pkg/planner/util/utilfuncp/func_pointer_misc.go +++ b/pkg/planner/util/utilfuncp/func_pointer_misc.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/property" + "github.com/pingcap/tidb/pkg/planner/util" "github.com/pingcap/tidb/pkg/planner/util/optimizetrace" ) @@ -68,5 +69,22 @@ var FindBestTask func(p base.LogicalPlan, prop *property.PhysicalProperty, planC // CanPushToCopImpl will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete logical // operators. -// todo: (7) arenatlx, remove this util func pointer when logical operators are all moved from core to logicalop. +// todo: (7) arenatlx, remove this util func pointer when logical operators are all moved from core to logicalOp. var CanPushToCopImpl func(p base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool + +// GetStreamAggs will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete physical +// operators. +// todo: (8) arenatlx, move this util func pointer to physicalOp when physical operators are all moved. +var GetStreamAggs func(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan + +// GetHashAggs will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete physical +// operators. +// todo: (9) arenatlx, move this util func pointer to physicalOp when physical operators are all moved. +var GetHashAggs func(la base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan + +// PruneByItems will be called by baseLogicalPlan in logicalOp pkg. The logic current exists for rule logic +// inside core. +// todo: (10) arenatlx, when rule is moved out of core, we should direct ref the rule.Func instead of this +// util func pointer. +var PruneByItems func(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) ( + byItems []*util.ByItems, parentUsedCols []*expression.Column)