planner: classsify logical aggregation logic into a seperate file for later pkg move (#54187)
ref pingcap/tidb#51664, ref pingcap/tidb#52714
This commit is contained in:
@ -1865,3 +1865,30 @@ func ConstExprConsiderPlanCache(expr Expression, inPlanCache bool) bool {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// ExprsHasSideEffects checks if any of the expressions has side effects.
|
||||
func ExprsHasSideEffects(exprs []Expression) bool {
|
||||
for _, expr := range exprs {
|
||||
if ExprHasSetVarOrSleep(expr) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ExprHasSetVarOrSleep checks if the expression has SetVar function or Sleep function.
|
||||
func ExprHasSetVarOrSleep(expr Expression) bool {
|
||||
scalaFunc, isScalaFunc := expr.(*ScalarFunction)
|
||||
if !isScalaFunc {
|
||||
return false
|
||||
}
|
||||
if scalaFunc.FuncName.L == ast.SetVar || scalaFunc.FuncName.L == ast.Sleep {
|
||||
return true
|
||||
}
|
||||
for _, arg := range scalaFunc.GetArgs() {
|
||||
if ExprHasSetVarOrSleep(arg) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -1165,7 +1165,7 @@ func (*MergeAdjacentProjection) OnTransform(old *memo.ExprIter) (newExprs []*mem
|
||||
proj := old.GetExpr().ExprNode.(*plannercore.LogicalProjection)
|
||||
childGroup := old.Children[0].Group
|
||||
child := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalProjection)
|
||||
if plannercore.ExprsHasSideEffects(child.Exprs) {
|
||||
if expression.ExprsHasSideEffects(child.Exprs) {
|
||||
return nil, false, false, nil
|
||||
}
|
||||
|
||||
@ -1517,7 +1517,7 @@ func NewRuleMergeAggregationProjection() Transformation {
|
||||
// Match implements Transformation interface.
|
||||
func (*MergeAggregationProjection) Match(old *memo.ExprIter) bool {
|
||||
proj := old.Children[0].GetExpr().ExprNode.(*plannercore.LogicalProjection)
|
||||
return !plannercore.ExprsHasSideEffects(proj.Exprs)
|
||||
return !expression.ExprsHasSideEffects(proj.Exprs)
|
||||
}
|
||||
|
||||
// OnTransform implements Transformation interface.
|
||||
|
||||
@ -21,6 +21,8 @@ go_library(
|
||||
"indexmerge_path.go",
|
||||
"indexmerge_unfinished_path.go",
|
||||
"initialize.go",
|
||||
"logical_aggregation.go",
|
||||
"logical_initialize.go",
|
||||
"logical_plan_builder.go",
|
||||
"logical_plans.go",
|
||||
"memtable_predicate_extractor.go",
|
||||
|
||||
@ -32,6 +32,10 @@ func init() {
|
||||
utilfuncp.HasMaxOneRowUtil = HasMaxOneRow
|
||||
utilfuncp.GetTaskPlanCost = getTaskPlanCost
|
||||
utilfuncp.CanPushToCopImpl = canPushToCopImpl
|
||||
utilfuncp.GetStreamAggs = getStreamAggs
|
||||
utilfuncp.GetHashAggs = getHashAggs
|
||||
utilfuncp.PruneByItems = pruneByItems
|
||||
|
||||
utilfuncp.AppendCandidate4PhysicalOptimizeOp = appendCandidate4PhysicalOptimizeOp
|
||||
utilfuncp.PushDownTopNForBaseLogicalPlan = pushDownTopNForBaseLogicalPlan
|
||||
|
||||
|
||||
@ -67,19 +67,6 @@ func (p *LogicalUnionScan) ExhaustPhysicalPlans(prop *property.PhysicalProperty)
|
||||
return []base.PhysicalPlan{us}, true, nil
|
||||
}
|
||||
|
||||
func getMaxSortPrefix(sortCols, allCols []*expression.Column) []int {
|
||||
tmpSchema := expression.NewSchema(allCols...)
|
||||
sortColOffsets := make([]int, 0, len(sortCols))
|
||||
for _, sortCol := range sortCols {
|
||||
offset := tmpSchema.ColumnIndex(sortCol)
|
||||
if offset == -1 {
|
||||
return sortColOffsets
|
||||
}
|
||||
sortColOffsets = append(sortColOffsets, offset)
|
||||
}
|
||||
return sortColOffsets
|
||||
}
|
||||
|
||||
func findMaxPrefixLen(candidates [][]*expression.Column, keys []*expression.Column) int {
|
||||
maxLen := 0
|
||||
for _, candidateKeys := range candidates {
|
||||
@ -183,7 +170,7 @@ func (p *LogicalJoin) GetMergeJoin(prop *property.PhysicalProperty, schema *expr
|
||||
return nil
|
||||
}
|
||||
for _, lhsChildProperty := range p.LeftProperties {
|
||||
offsets := getMaxSortPrefix(lhsChildProperty, leftJoinKeys)
|
||||
offsets := util.GetMaxSortPrefix(lhsChildProperty, leftJoinKeys)
|
||||
// If not all equal conditions hit properties. We ban merge join heuristically. Because in this case, merge join
|
||||
// may get a very low performance. In executor, executes join results before other conditions filter it.
|
||||
if len(offsets) < len(leftJoinKeys) {
|
||||
@ -1459,7 +1446,7 @@ func (p *LogicalJoin) constructIndexJoinInnerSideTask(dsCopTask *CopTask, ds *Da
|
||||
if len(groupByCols) != len(la.GroupByItems) {
|
||||
preferStream = false
|
||||
}
|
||||
if la.HasDistinct() && !la.distinctArgsMeetsProperty() {
|
||||
if la.HasDistinct() && !la.DistinctArgsMeetsProperty() {
|
||||
preferStream = false
|
||||
}
|
||||
// sort items must be the super set of group by items
|
||||
@ -3180,11 +3167,6 @@ func canPushToCopImpl(lp base.LogicalPlan, storeTp kv.StoreType, considerDual bo
|
||||
return ret
|
||||
}
|
||||
|
||||
// CanPushToCop implements LogicalPlan interface.
|
||||
func (la *LogicalAggregation) CanPushToCop(storeTp kv.StoreType) bool {
|
||||
return la.BaseLogicalPlan.CanPushToCop(storeTp) && !la.NoCopPushDown
|
||||
}
|
||||
|
||||
func getEnforcedStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan {
|
||||
if prop.IsFlashProp() {
|
||||
return nil
|
||||
@ -3230,20 +3212,8 @@ func getEnforcedStreamAggs(la *LogicalAggregation, prop *property.PhysicalProper
|
||||
return enforcedAggs
|
||||
}
|
||||
|
||||
func (la *LogicalAggregation) distinctArgsMeetsProperty() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if aggFunc.HasDistinct {
|
||||
for _, distinctArg := range aggFunc.Args {
|
||||
if !expression.Contains(la.GroupByItems, distinctArg) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func getStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan {
|
||||
func getStreamAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan {
|
||||
la := lp.(*LogicalAggregation)
|
||||
// TODO: support CopTiFlash task type in stream agg
|
||||
if prop.IsFlashProp() {
|
||||
return nil
|
||||
@ -3285,7 +3255,7 @@ func getStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []ba
|
||||
// if variable doesn't allow DistinctAggPushDown, just produce root task type.
|
||||
// if variable does allow DistinctAggPushDown, but OP itself can't be pushed down to tikv, just produce root task type.
|
||||
taskTypes = []property.TaskType{property.RootTaskType}
|
||||
} else if !la.distinctArgsMeetsProperty() {
|
||||
} else if !la.DistinctArgsMeetsProperty() {
|
||||
continue
|
||||
}
|
||||
} else if !la.PreferAggToCop {
|
||||
@ -3321,7 +3291,7 @@ func getStreamAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []ba
|
||||
}
|
||||
|
||||
// TODO: support more operators and distinct later
|
||||
func (la *LogicalAggregation) checkCanPushDownToMPP() bool {
|
||||
func checkCanPushDownToMPP(la *LogicalAggregation) bool {
|
||||
hasUnsupportedDistinct := false
|
||||
for _, agg := range la.AggFuncs {
|
||||
// MPP does not support distinct except count distinct now
|
||||
@ -3494,17 +3464,18 @@ func tryToGetMppHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty
|
||||
// for 2, the final result for this physical operator enumeration is chosen or rejected is according to more factors later (hint/variable/partition/virtual-col/cost)
|
||||
//
|
||||
// That is to say, the non-complete positive judgement of canPushDownToMPP/canPushDownToTiFlash/canPushDownToTiKV is not that for sure here.
|
||||
func getHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base.PhysicalPlan {
|
||||
func getHashAggs(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan {
|
||||
la := lp.(*LogicalAggregation)
|
||||
if !prop.IsSortItemEmpty() {
|
||||
return nil
|
||||
}
|
||||
if prop.TaskTp == property.MppTaskType && !la.checkCanPushDownToMPP() {
|
||||
if prop.TaskTp == property.MppTaskType && !checkCanPushDownToMPP(la) {
|
||||
return nil
|
||||
}
|
||||
hashAggs := make([]base.PhysicalPlan, 0, len(prop.GetAllPossibleChildTaskTypes()))
|
||||
taskTypes := []property.TaskType{property.CopSingleReadTaskType, property.CopMultiReadTaskType}
|
||||
canPushDownToTiFlash := la.CanPushToCop(kv.TiFlash)
|
||||
canPushDownToMPP := canPushDownToTiFlash && la.SCtx().GetSessionVars().IsMPPAllowed() && la.checkCanPushDownToMPP()
|
||||
canPushDownToMPP := canPushDownToTiFlash && la.SCtx().GetSessionVars().IsMPPAllowed() && checkCanPushDownToMPP(la)
|
||||
if la.HasDistinct() {
|
||||
// TODO: remove after the cost estimation of distinct pushdown is implemented.
|
||||
if !la.SCtx().GetSessionVars().AllowDistinctAggPushDown || !la.CanPushToCop(kv.TiKV) {
|
||||
@ -3554,50 +3525,6 @@ func getHashAggs(la *LogicalAggregation, prop *property.PhysicalProperty) []base
|
||||
return hashAggs
|
||||
}
|
||||
|
||||
// ResetHintIfConflicted resets the PreferAggType if they are conflicted,
|
||||
// and returns the two PreferAggType hints.
|
||||
func (la *LogicalAggregation) ResetHintIfConflicted() (preferHash bool, preferStream bool) {
|
||||
preferHash = (la.PreferAggType & h.PreferHashAgg) > 0
|
||||
preferStream = (la.PreferAggType & h.PreferStreamAgg) > 0
|
||||
if preferHash && preferStream {
|
||||
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer aggregation hints are conflicted")
|
||||
la.PreferAggType = 0
|
||||
preferHash, preferStream = false, false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ExhaustPhysicalPlans implements LogicalPlan interface.
|
||||
func (la *LogicalAggregation) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
|
||||
if la.PreferAggToCop {
|
||||
if !la.CanPushToCop(kv.TiKV) {
|
||||
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning(
|
||||
"Optimizer Hint AGG_TO_COP is inapplicable")
|
||||
la.PreferAggToCop = false
|
||||
}
|
||||
}
|
||||
|
||||
preferHash, preferStream := la.ResetHintIfConflicted()
|
||||
|
||||
hashAggs := getHashAggs(la, prop)
|
||||
if hashAggs != nil && preferHash {
|
||||
return hashAggs, true, nil
|
||||
}
|
||||
|
||||
streamAggs := getStreamAggs(la, prop)
|
||||
if streamAggs != nil && preferStream {
|
||||
return streamAggs, true, nil
|
||||
}
|
||||
|
||||
aggs := append(hashAggs, streamAggs...)
|
||||
|
||||
if streamAggs == nil && preferStream && !prop.IsSortItemEmpty() {
|
||||
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer Hint STREAM_AGG is inapplicable")
|
||||
}
|
||||
|
||||
return aggs, !(preferStream || preferHash), nil
|
||||
}
|
||||
|
||||
// ExhaustPhysicalPlans implements LogicalPlan interface.
|
||||
func (p *LogicalSelection) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
|
||||
newProps := make([]*property.PhysicalProperty, 0, 2)
|
||||
|
||||
@ -897,25 +897,6 @@ func (p *LogicalJoin) ExplainInfo() string {
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
// ExplainInfo implements Plan interface.
|
||||
func (p *LogicalAggregation) ExplainInfo() string {
|
||||
buffer := bytes.NewBufferString("")
|
||||
if len(p.GroupByItems) > 0 {
|
||||
fmt.Fprintf(buffer, "group by:%s, ",
|
||||
expression.SortedExplainExpressionList(p.SCtx().GetExprCtx().GetEvalCtx(), p.GroupByItems))
|
||||
}
|
||||
if len(p.AggFuncs) > 0 {
|
||||
buffer.WriteString("funcs:")
|
||||
for i, agg := range p.AggFuncs {
|
||||
buffer.WriteString(aggregation.ExplainAggFunc(p.SCtx().GetExprCtx().GetEvalCtx(), agg, false))
|
||||
if i+1 < len(p.AggFuncs) {
|
||||
buffer.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
// ExplainInfo implements Plan interface.
|
||||
func (p *LogicalProjection) ExplainInfo() string {
|
||||
return expression.ExplainExpressionList(p.Exprs, p.Schema())
|
||||
|
||||
@ -19,61 +19,12 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/operator/baseimpl"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
|
||||
"github.com/pingcap/tidb/pkg/planner/property"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/plancodec"
|
||||
"github.com/pingcap/tidb/pkg/util/size"
|
||||
)
|
||||
|
||||
// Init initializes LogicalAggregation.
|
||||
func (la LogicalAggregation) Init(ctx base.PlanContext, offset int) *LogicalAggregation {
|
||||
la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeAgg, &la, offset)
|
||||
return &la
|
||||
}
|
||||
|
||||
// Init initializes LogicalJoin.
|
||||
func (p LogicalJoin) Init(ctx base.PlanContext, offset int) *LogicalJoin {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeJoin, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes DataSource.
|
||||
func (ds DataSource) Init(ctx base.PlanContext, offset int) *DataSource {
|
||||
ds.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDataSource, &ds, offset)
|
||||
return &ds
|
||||
}
|
||||
|
||||
// Init initializes TiKVSingleGather.
|
||||
func (sg TiKVSingleGather) Init(ctx base.PlanContext, offset int) *TiKVSingleGather {
|
||||
sg.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTiKVSingleGather, &sg, offset)
|
||||
return &sg
|
||||
}
|
||||
|
||||
// Init initializes LogicalTableScan.
|
||||
func (ts LogicalTableScan) Init(ctx base.PlanContext, offset int) *LogicalTableScan {
|
||||
ts.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTableScan, &ts, offset)
|
||||
return &ts
|
||||
}
|
||||
|
||||
// Init initializes LogicalIndexScan.
|
||||
func (is LogicalIndexScan) Init(ctx base.PlanContext, offset int) *LogicalIndexScan {
|
||||
is.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeIdxScan, &is, offset)
|
||||
return &is
|
||||
}
|
||||
|
||||
// Init initializes LogicalApply.
|
||||
func (la LogicalApply) Init(ctx base.PlanContext, offset int) *LogicalApply {
|
||||
la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeApply, &la, offset)
|
||||
return &la
|
||||
}
|
||||
|
||||
// Init initializes LogicalSelection.
|
||||
func (p LogicalSelection) Init(ctx base.PlanContext, qbOffset int) *LogicalSelection {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSel, &p, qbOffset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalSelection.
|
||||
func (p PhysicalSelection) Init(ctx base.PlanContext, stats *property.StatsInfo, qbOffset int, props ...*property.PhysicalProperty) *PhysicalSelection {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSel, &p, qbOffset)
|
||||
@ -82,24 +33,6 @@ func (p PhysicalSelection) Init(ctx base.PlanContext, stats *property.StatsInfo,
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalUnionScan.
|
||||
func (p LogicalUnionScan) Init(ctx base.PlanContext, qbOffset int) *LogicalUnionScan {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnionScan, &p, qbOffset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalProjection.
|
||||
func (p LogicalProjection) Init(ctx base.PlanContext, qbOffset int) *LogicalProjection {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeProj, &p, qbOffset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalProjection.
|
||||
func (p LogicalExpand) Init(ctx base.PlanContext, offset int) *LogicalExpand {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeExpand, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalProjection.
|
||||
func (p PhysicalProjection) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalProjection {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeProj, &p, offset)
|
||||
@ -108,18 +41,6 @@ func (p PhysicalProjection) Init(ctx base.PlanContext, stats *property.StatsInfo
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalUnionAll.
|
||||
func (p LogicalUnionAll) Init(ctx base.PlanContext, offset int) *LogicalUnionAll {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnion, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalPartitionUnionAll.
|
||||
func (p LogicalPartitionUnionAll) Init(ctx base.PlanContext, offset int) *LogicalPartitionUnionAll {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypePartitionUnion, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalUnionAll.
|
||||
func (p PhysicalUnionAll) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalUnionAll {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeUnion, &p, offset)
|
||||
@ -128,12 +49,6 @@ func (p PhysicalUnionAll) Init(ctx base.PlanContext, stats *property.StatsInfo,
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalSort.
|
||||
func (ls LogicalSort) Init(ctx base.PlanContext, offset int) *LogicalSort {
|
||||
ls.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSort, &ls, offset)
|
||||
return &ls
|
||||
}
|
||||
|
||||
// Init initializes PhysicalSort.
|
||||
func (p PhysicalSort) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalSort {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSort, &p, offset)
|
||||
@ -150,12 +65,6 @@ func (p NominalSort) Init(ctx base.PlanContext, stats *property.StatsInfo, offse
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalTopN.
|
||||
func (lt LogicalTopN) Init(ctx base.PlanContext, offset int) *LogicalTopN {
|
||||
lt.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTopN, <, offset)
|
||||
return <
|
||||
}
|
||||
|
||||
// Init initializes PhysicalTopN.
|
||||
func (p PhysicalTopN) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalTopN {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeTopN, &p, offset)
|
||||
@ -164,12 +73,6 @@ func (p PhysicalTopN) Init(ctx base.PlanContext, stats *property.StatsInfo, offs
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalLimit.
|
||||
func (p LogicalLimit) Init(ctx base.PlanContext, offset int) *LogicalLimit {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLimit, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalLimit.
|
||||
func (p PhysicalLimit) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalLimit {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeLimit, &p, offset)
|
||||
@ -178,12 +81,6 @@ func (p PhysicalLimit) Init(ctx base.PlanContext, stats *property.StatsInfo, off
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalTableDual.
|
||||
func (p LogicalTableDual) Init(ctx base.PlanContext, offset int) *LogicalTableDual {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDual, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalTableDual.
|
||||
func (p PhysicalTableDual) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int) *PhysicalTableDual {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeDual, &p, offset)
|
||||
@ -191,12 +88,6 @@ func (p PhysicalTableDual) Init(ctx base.PlanContext, stats *property.StatsInfo,
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalMaxOneRow.
|
||||
func (p LogicalMaxOneRow) Init(ctx base.PlanContext, offset int) *LogicalMaxOneRow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMaxOneRow, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalMaxOneRow.
|
||||
func (p PhysicalMaxOneRow) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalMaxOneRow {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMaxOneRow, &p, offset)
|
||||
@ -205,12 +96,6 @@ func (p PhysicalMaxOneRow) Init(ctx base.PlanContext, stats *property.StatsInfo,
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalWindow.
|
||||
func (p LogicalWindow) Init(ctx base.PlanContext, offset int) *LogicalWindow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeWindow, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalWindow.
|
||||
func (p PhysicalWindow) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalWindow {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeWindow, &p, offset)
|
||||
@ -265,18 +150,6 @@ func (p ImportInto) Init(ctx base.PlanContext) *ImportInto {
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalShow.
|
||||
func (p LogicalShow) Init(ctx base.PlanContext) *LogicalShow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShow, &p, 0)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalShowDDLJobs.
|
||||
func (p LogicalShowDDLJobs) Init(ctx base.PlanContext) *LogicalShowDDLJobs {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShowDDLJobs, &p, 0)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalShow.
|
||||
func (p PhysicalShow) Init(ctx base.PlanContext) *PhysicalShow {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeShow, &p, 0)
|
||||
@ -293,12 +166,6 @@ func (p PhysicalShowDDLJobs) Init(ctx base.PlanContext) *PhysicalShowDDLJobs {
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalLock.
|
||||
func (p LogicalLock) Init(ctx base.PlanContext) *LogicalLock {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLock, &p, 0)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalLock.
|
||||
func (p PhysicalLock) Init(ctx base.PlanContext, stats *property.StatsInfo, props ...*property.PhysicalProperty) *PhysicalLock {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeLock, &p, 0)
|
||||
@ -319,12 +186,6 @@ func (p PhysicalIndexScan) Init(ctx base.PlanContext, offset int) *PhysicalIndex
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalMemTable.
|
||||
func (p LogicalMemTable) Init(ctx base.PlanContext, offset int) *LogicalMemTable {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalMemTable.
|
||||
func (p PhysicalMemTable) Init(ctx base.PlanContext, stats *property.StatsInfo, offset int) *PhysicalMemTable {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset)
|
||||
@ -594,12 +455,6 @@ func flattenPushDownPlan(p base.PhysicalPlan) []base.PhysicalPlan {
|
||||
return plans
|
||||
}
|
||||
|
||||
// Init only assigns type and context.
|
||||
func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTE, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init only assigns type and context.
|
||||
func (p PhysicalCTE) Init(ctx base.PlanContext, stats *property.StatsInfo) *PhysicalCTE {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeCTE, &p, 0)
|
||||
@ -607,12 +462,6 @@ func (p PhysicalCTE) Init(ctx base.PlanContext, stats *property.StatsInfo) *Phys
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init only assigns type and context.
|
||||
func (p LogicalCTETable) Init(ctx base.PlanContext, offset int) *LogicalCTETable {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTETable, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init only assigns type and context.
|
||||
func (p PhysicalCTETable) Init(ctx base.PlanContext, stats *property.StatsInfo) *PhysicalCTETable {
|
||||
p.Plan = baseimpl.NewBasePlan(ctx, plancodec.TypeCTETable, 0)
|
||||
@ -634,12 +483,6 @@ func (p FKCascade) Init(ctx base.PlanContext) *FKCascade {
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalSequence
|
||||
func (p LogicalSequence) Init(ctx base.PlanContext, offset int) *LogicalSequence {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSequence, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes PhysicalSequence
|
||||
func (p PhysicalSequence) Init(ctx base.PlanContext, stats *property.StatsInfo, blockOffset int, props ...*property.PhysicalProperty) *PhysicalSequence {
|
||||
p.basePhysicalPlan = newBasePhysicalPlan(ctx, plancodec.TypeSequence, &p, blockOffset)
|
||||
|
||||
763
pkg/planner/core/logical_aggregation.go
Normal file
763
pkg/planner/core/logical_aggregation.go
Normal file
@ -0,0 +1,763 @@
|
||||
// Copyright 2024 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/expression"
|
||||
"github.com/pingcap/tidb/pkg/expression/aggregation"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/parser/ast"
|
||||
"github.com/pingcap/tidb/pkg/planner/cardinality"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
|
||||
fd "github.com/pingcap/tidb/pkg/planner/funcdep"
|
||||
"github.com/pingcap/tidb/pkg/planner/property"
|
||||
"github.com/pingcap/tidb/pkg/planner/util"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace/logicaltrace"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/utilfuncp"
|
||||
h "github.com/pingcap/tidb/pkg/util/hint"
|
||||
"github.com/pingcap/tidb/pkg/util/intset"
|
||||
"github.com/pingcap/tidb/pkg/util/plancodec"
|
||||
)
|
||||
|
||||
// LogicalAggregation represents an aggregate plan.
|
||||
type LogicalAggregation struct {
|
||||
logicalop.LogicalSchemaProducer
|
||||
|
||||
AggFuncs []*aggregation.AggFuncDesc
|
||||
GroupByItems []expression.Expression
|
||||
|
||||
// PreferAggType And PreferAggToCop stores aggregation hint information.
|
||||
PreferAggType uint
|
||||
PreferAggToCop bool
|
||||
|
||||
PossibleProperties [][]*expression.Column
|
||||
InputCount float64 // InputCount is the input count of this plan.
|
||||
|
||||
// NoCopPushDown indicates if planner must not push this agg down to coprocessor.
|
||||
// It is true when the agg is in the outer child tree of apply.
|
||||
NoCopPushDown bool
|
||||
}
|
||||
|
||||
// Init initializes LogicalAggregation.
|
||||
func (la LogicalAggregation) Init(ctx base.PlanContext, offset int) *LogicalAggregation {
|
||||
la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeAgg, &la, offset)
|
||||
return &la
|
||||
}
|
||||
|
||||
// *************************** start implementation of Plan interface ***************************
|
||||
|
||||
// ExplainInfo implements base.Plan.<4th> interface.
|
||||
func (la *LogicalAggregation) ExplainInfo() string {
|
||||
buffer := bytes.NewBufferString("")
|
||||
if len(la.GroupByItems) > 0 {
|
||||
fmt.Fprintf(buffer, "group by:%s, ",
|
||||
expression.SortedExplainExpressionList(la.SCtx().GetExprCtx().GetEvalCtx(), la.GroupByItems))
|
||||
}
|
||||
if len(la.AggFuncs) > 0 {
|
||||
buffer.WriteString("funcs:")
|
||||
for i, agg := range la.AggFuncs {
|
||||
buffer.WriteString(aggregation.ExplainAggFunc(la.SCtx().GetExprCtx().GetEvalCtx(), agg, false))
|
||||
if i+1 < len(la.AggFuncs) {
|
||||
buffer.WriteString(", ")
|
||||
}
|
||||
}
|
||||
}
|
||||
return buffer.String()
|
||||
}
|
||||
|
||||
// ReplaceExprColumns implements base.Plan.<5th> interface.
|
||||
func (la *LogicalAggregation) ReplaceExprColumns(replace map[string]*expression.Column) {
|
||||
for _, agg := range la.AggFuncs {
|
||||
for _, aggExpr := range agg.Args {
|
||||
ResolveExprAndReplace(aggExpr, replace)
|
||||
}
|
||||
for _, orderExpr := range agg.OrderByItems {
|
||||
ResolveExprAndReplace(orderExpr.Expr, replace)
|
||||
}
|
||||
}
|
||||
for _, gbyItem := range la.GroupByItems {
|
||||
ResolveExprAndReplace(gbyItem, replace)
|
||||
}
|
||||
}
|
||||
|
||||
// *************************** end implementation of Plan interface ***************************
|
||||
|
||||
// *************************** start implementation of logicalPlan interface ***************************
|
||||
|
||||
// HashCode inherits BaseLogicalPlan.LogicalPlan.<0th> implementation.
|
||||
|
||||
// PredicatePushDown implements base.LogicalPlan.<1st> interface.
|
||||
func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
|
||||
condsToPush, ret := la.splitCondForAggregation(predicates)
|
||||
la.BaseLogicalPlan.PredicatePushDown(condsToPush, opt)
|
||||
return ret, la
|
||||
}
|
||||
|
||||
// PruneColumns implements base.LogicalPlan.<2nd> interface.
|
||||
func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
|
||||
child := la.Children()[0]
|
||||
used := expression.GetUsedList(la.SCtx().GetExprCtx().GetEvalCtx(), parentUsedCols, la.Schema())
|
||||
prunedColumns := make([]*expression.Column, 0)
|
||||
prunedFunctions := make([]*aggregation.AggFuncDesc, 0)
|
||||
prunedGroupByItems := make([]expression.Expression, 0)
|
||||
|
||||
allFirstRow := true
|
||||
allRemainFirstRow := true
|
||||
for i := len(used) - 1; i >= 0; i-- {
|
||||
if la.AggFuncs[i].Name != ast.AggFuncFirstRow {
|
||||
allFirstRow = false
|
||||
}
|
||||
if !used[i] && !expression.ExprsHasSideEffects(la.AggFuncs[i].Args) {
|
||||
prunedColumns = append(prunedColumns, la.Schema().Columns[i])
|
||||
prunedFunctions = append(prunedFunctions, la.AggFuncs[i])
|
||||
la.Schema().Columns = append(la.Schema().Columns[:i], la.Schema().Columns[i+1:]...)
|
||||
la.AggFuncs = append(la.AggFuncs[:i], la.AggFuncs[i+1:]...)
|
||||
} else if la.AggFuncs[i].Name != ast.AggFuncFirstRow {
|
||||
allRemainFirstRow = false
|
||||
}
|
||||
}
|
||||
logicaltrace.AppendColumnPruneTraceStep(la, prunedColumns, opt)
|
||||
logicaltrace.AppendFunctionPruneTraceStep(la, prunedFunctions, opt)
|
||||
//nolint: prealloc
|
||||
var selfUsedCols []*expression.Column
|
||||
for _, aggrFunc := range la.AggFuncs {
|
||||
selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil)
|
||||
|
||||
var cols []*expression.Column
|
||||
aggrFunc.OrderByItems, cols = utilfuncp.PruneByItems(la, aggrFunc.OrderByItems, opt)
|
||||
selfUsedCols = append(selfUsedCols, cols...)
|
||||
}
|
||||
if len(la.AggFuncs) == 0 || (!allFirstRow && allRemainFirstRow) {
|
||||
// If all the aggregate functions are pruned, we should add an aggregate function to maintain the info of row numbers.
|
||||
// For all the aggregate functions except `first_row`, if we have an empty table defined as t(a,b),
|
||||
// `select agg(a) from t` would always return one row, while `select agg(a) from t group by b` would return empty.
|
||||
// For `first_row` which is only used internally by tidb, `first_row(a)` would always return empty for empty input now.
|
||||
var err error
|
||||
var newAgg *aggregation.AggFuncDesc
|
||||
if allFirstRow {
|
||||
newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncFirstRow, []expression.Expression{expression.NewOne()}, false)
|
||||
} else {
|
||||
newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncCount, []expression.Expression{expression.NewOne()}, false)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
la.AggFuncs = append(la.AggFuncs, newAgg)
|
||||
col := &expression.Column{
|
||||
UniqueID: la.SCtx().GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: newAgg.RetTp,
|
||||
}
|
||||
la.Schema().Columns = append(la.Schema().Columns, col)
|
||||
}
|
||||
|
||||
if len(la.GroupByItems) > 0 {
|
||||
for i := len(la.GroupByItems) - 1; i >= 0; i-- {
|
||||
cols := expression.ExtractColumns(la.GroupByItems[i])
|
||||
if len(cols) == 0 && !expression.ExprHasSetVarOrSleep(la.GroupByItems[i]) {
|
||||
prunedGroupByItems = append(prunedGroupByItems, la.GroupByItems[i])
|
||||
la.GroupByItems = append(la.GroupByItems[:i], la.GroupByItems[i+1:]...)
|
||||
} else {
|
||||
selfUsedCols = append(selfUsedCols, cols...)
|
||||
}
|
||||
}
|
||||
// If all the group by items are pruned, we should add a constant 1 to keep the correctness.
|
||||
// Because `select count(*) from t` is different from `select count(*) from t group by 1`.
|
||||
if len(la.GroupByItems) == 0 {
|
||||
la.GroupByItems = []expression.Expression{expression.NewOne()}
|
||||
}
|
||||
}
|
||||
logicaltrace.AppendGroupByItemsPruneTraceStep(la, prunedGroupByItems, opt)
|
||||
var err error
|
||||
la.Children()[0], err = child.PruneColumns(selfUsedCols, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// update children[0]
|
||||
child = la.Children()[0]
|
||||
// Do an extra Projection Elimination here. This is specially for empty Projection below Aggregation.
|
||||
// This kind of Projection would cause some bugs for MPP plan and is safe to be removed.
|
||||
// This kind of Projection should be removed in Projection Elimination, but currently PrunColumnsAgain is
|
||||
// the last rule. So we specially handle this case here.
|
||||
if childProjection, isProjection := child.(*LogicalProjection); isProjection {
|
||||
if len(childProjection.Exprs) == 0 && childProjection.Schema().Len() == 0 {
|
||||
childOfChild := childProjection.Children()[0]
|
||||
la.SetChildren(childOfChild)
|
||||
}
|
||||
}
|
||||
return la, nil
|
||||
}
|
||||
|
||||
// FindBestTask inherits BaseLogicalPlan.LogicalPlan.<3rd> implementation.
|
||||
|
||||
// BuildKeyInfo implements base.LogicalPlan.<4th> interface.
|
||||
func (la *LogicalAggregation) BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema) {
|
||||
// According to the issue#46962, we can ignore the judgment of partial agg
|
||||
// Sometimes, the agg inside of subquery and there is a true condition in where clause, the agg function is empty.
|
||||
// For example, ``` select xxxx from xxx WHERE TRUE = ALL ( SELECT TRUE GROUP BY 1 LIMIT 1 ) IS NULL IS NOT NULL;
|
||||
// In this case, the agg is complete mode and we can ignore this check.
|
||||
if len(la.AggFuncs) != 0 && la.IsPartialModeAgg() {
|
||||
return
|
||||
}
|
||||
la.LogicalSchemaProducer.BuildKeyInfo(selfSchema, childSchema)
|
||||
la.buildSelfKeyInfo(selfSchema)
|
||||
}
|
||||
|
||||
// PushDownTopN inherits BaseLogicalPlan.LogicalPlan.<5rd> implementation.
|
||||
|
||||
// DeriveTopN inherits BaseLogicalPlan.LogicalPlan.<6th> interface.
|
||||
|
||||
// PredicateSimplification inherits BaseLogicalPlan.LogicalPlan.<7th> implementation.
|
||||
|
||||
// ConstantPropagation inherits BaseLogicalPlan.LogicalPlan.<8th> implementation.
|
||||
|
||||
// PullUpConstantPredicates inherits BaseLogicalPlan.LogicalPlan.<9th> implementation.
|
||||
|
||||
// RecursiveDeriveStats inherits BaseLogicalPlan.LogicalPlan.<10th> implementation.
|
||||
|
||||
// DeriveStats implement base.LogicalPlan.<11th> interface.
|
||||
func (la *LogicalAggregation) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error) {
|
||||
childProfile := childStats[0]
|
||||
gbyCols := make([]*expression.Column, 0, len(la.GroupByItems))
|
||||
for _, gbyExpr := range la.GroupByItems {
|
||||
cols := expression.ExtractColumns(gbyExpr)
|
||||
gbyCols = append(gbyCols, cols...)
|
||||
}
|
||||
if la.StatsInfo() != nil {
|
||||
// Reload GroupNDVs since colGroups may have changed.
|
||||
la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols)
|
||||
return la.StatsInfo(), nil
|
||||
}
|
||||
ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(gbyCols, childSchema[0], childProfile)
|
||||
la.SetStats(&property.StatsInfo{
|
||||
RowCount: ndv,
|
||||
ColNDVs: make(map[int64]float64, selfSchema.Len()),
|
||||
})
|
||||
// We cannot estimate the ColNDVs for every output, so we use a conservative strategy.
|
||||
for _, col := range selfSchema.Columns {
|
||||
la.StatsInfo().ColNDVs[col.UniqueID] = ndv
|
||||
}
|
||||
la.InputCount = childProfile.RowCount
|
||||
la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols)
|
||||
return la.StatsInfo(), nil
|
||||
}
|
||||
|
||||
// ExtractColGroups implements base.LogicalPlan.<12th> interface.
|
||||
func (la *LogicalAggregation) ExtractColGroups(_ [][]*expression.Column) [][]*expression.Column {
|
||||
// Parent colGroups would be dicarded, because aggregation would make NDV of colGroups
|
||||
// which does not match GroupByItems invalid.
|
||||
// Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b,
|
||||
// but we have no other approaches for the NDV estimation of these cases
|
||||
// except for using the independent assumption, unless we can use stats of expression index.
|
||||
gbyCols := make([]*expression.Column, 0, len(la.GroupByItems))
|
||||
for _, gbyExpr := range la.GroupByItems {
|
||||
cols := expression.ExtractColumns(gbyExpr)
|
||||
gbyCols = append(gbyCols, cols...)
|
||||
}
|
||||
if len(gbyCols) > 1 {
|
||||
return [][]*expression.Column{expression.SortColumns(gbyCols)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// PreparePossibleProperties implements base.LogicalPlan.<13th> interface.
|
||||
func (la *LogicalAggregation) PreparePossibleProperties(_ *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
|
||||
childProps := childrenProperties[0]
|
||||
// If there's no group-by item, the stream aggregation could have no order property. So we can add an empty property
|
||||
// when its group-by item is empty.
|
||||
if len(la.GroupByItems) == 0 {
|
||||
la.PossibleProperties = [][]*expression.Column{nil}
|
||||
return nil
|
||||
}
|
||||
resultProperties := make([][]*expression.Column, 0, len(childProps))
|
||||
groupByCols := la.GetGroupByCols()
|
||||
for _, possibleChildProperty := range childProps {
|
||||
sortColOffsets := util.GetMaxSortPrefix(possibleChildProperty, groupByCols)
|
||||
if len(sortColOffsets) == len(groupByCols) {
|
||||
prop := possibleChildProperty[:len(groupByCols)]
|
||||
resultProperties = append(resultProperties, prop)
|
||||
}
|
||||
}
|
||||
la.PossibleProperties = resultProperties
|
||||
return resultProperties
|
||||
}
|
||||
|
||||
// ExhaustPhysicalPlans implements base.LogicalPlan.<14th> interface.
|
||||
func (la *LogicalAggregation) ExhaustPhysicalPlans(prop *property.PhysicalProperty) ([]base.PhysicalPlan, bool, error) {
|
||||
if la.PreferAggToCop {
|
||||
if !la.CanPushToCop(kv.TiKV) {
|
||||
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning(
|
||||
"Optimizer Hint AGG_TO_COP is inapplicable")
|
||||
la.PreferAggToCop = false
|
||||
}
|
||||
}
|
||||
|
||||
preferHash, preferStream := la.ResetHintIfConflicted()
|
||||
|
||||
hashAggs := utilfuncp.GetHashAggs(la, prop)
|
||||
if hashAggs != nil && preferHash {
|
||||
return hashAggs, true, nil
|
||||
}
|
||||
|
||||
streamAggs := utilfuncp.GetStreamAggs(la, prop)
|
||||
if streamAggs != nil && preferStream {
|
||||
return streamAggs, true, nil
|
||||
}
|
||||
|
||||
aggs := append(hashAggs, streamAggs...)
|
||||
|
||||
if streamAggs == nil && preferStream && !prop.IsSortItemEmpty() {
|
||||
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer Hint STREAM_AGG is inapplicable")
|
||||
}
|
||||
|
||||
return aggs, !(preferStream || preferHash), nil
|
||||
}
|
||||
|
||||
// ExtractCorrelatedCols implements base.LogicalPlan.<15th> interface.
|
||||
func (la *LogicalAggregation) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := make([]*expression.CorrelatedColumn, 0, len(la.GroupByItems)+len(la.AggFuncs))
|
||||
for _, expr := range la.GroupByItems {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(expr)...)
|
||||
}
|
||||
for _, fun := range la.AggFuncs {
|
||||
for _, arg := range fun.Args {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(arg)...)
|
||||
}
|
||||
for _, arg := range fun.OrderByItems {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(arg.Expr)...)
|
||||
}
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
// MaxOneRow inherits BaseLogicalPlan.LogicalPlan.<16th> implementation.
|
||||
|
||||
// Children inherits BaseLogicalPlan.LogicalPlan.<17th> implementation.
|
||||
|
||||
// SetChildren inherits BaseLogicalPlan.LogicalPlan.<18th> implementation.
|
||||
|
||||
// SetChild inherits BaseLogicalPlan.LogicalPlan.<19th> implementation.
|
||||
|
||||
// RollBackTaskMap inherits BaseLogicalPlan.LogicalPlan.<20th> implementation.
|
||||
|
||||
// CanPushToCop implements base.LogicalPlan.<21st> interface.
|
||||
func (la *LogicalAggregation) CanPushToCop(storeTp kv.StoreType) bool {
|
||||
return la.BaseLogicalPlan.CanPushToCop(storeTp) && !la.NoCopPushDown
|
||||
}
|
||||
|
||||
// ExtractFD implements base.LogicalPlan.<22nd> interface.
|
||||
// 1:
|
||||
// In most of the cases, using FDs to check the only_full_group_by problem should be done in the buildAggregation phase
|
||||
// by extracting the bottom-up FDs graph from the `p` --- the sub plan tree that has already been built.
|
||||
//
|
||||
// 2:
|
||||
// and this requires that some conditions push-down into the `p` like selection should be done before building aggregation,
|
||||
// otherwise, 'a=1 and a can occur in the select lists of a group by' will be miss-checked because it doesn't be implied in the known FDs graph.
|
||||
//
|
||||
// 3:
|
||||
// when a logical agg is built, it's schema columns indicates what the permitted-non-agg columns is. Therefore, we shouldn't
|
||||
// depend on logicalAgg.ExtractFD() to finish the only_full_group_by checking problem rather than by 1 & 2.
|
||||
func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
|
||||
// basically extract the children's fdSet.
|
||||
fds := la.LogicalSchemaProducer.ExtractFD()
|
||||
// collect the output columns' unique ID.
|
||||
outputColsUniqueIDs := intset.NewFastIntSet()
|
||||
notnullColsUniqueIDs := intset.NewFastIntSet()
|
||||
groupByColsUniqueIDs := intset.NewFastIntSet()
|
||||
groupByColsOutputCols := intset.NewFastIntSet()
|
||||
// Since the aggregation is build ahead of projection, the latter one will reuse the column with UniqueID allocated in aggregation
|
||||
// via aggMapper, so we don't need unnecessarily maintain the <aggDes, UniqueID> mapping in the FDSet like expr did, just treating
|
||||
// it as normal column.
|
||||
for _, one := range la.Schema().Columns {
|
||||
outputColsUniqueIDs.Insert(int(one.UniqueID))
|
||||
}
|
||||
// For one like sum(a), we don't need to build functional dependency from a --> sum(a), cause it's only determined by the
|
||||
// group-by-item (group-by-item --> sum(a)).
|
||||
for _, expr := range la.GroupByItems {
|
||||
switch x := expr.(type) {
|
||||
case *expression.Column:
|
||||
groupByColsUniqueIDs.Insert(int(x.UniqueID))
|
||||
case *expression.CorrelatedColumn:
|
||||
// shouldn't be here, intercepted by plan builder as unknown column.
|
||||
continue
|
||||
case *expression.Constant:
|
||||
// shouldn't be here, interpreted as pos param by plan builder.
|
||||
continue
|
||||
case *expression.ScalarFunction:
|
||||
hashCode := string(x.HashCode())
|
||||
var (
|
||||
ok bool
|
||||
scalarUniqueID int
|
||||
)
|
||||
if scalarUniqueID, ok = fds.IsHashCodeRegistered(hashCode); ok {
|
||||
groupByColsUniqueIDs.Insert(scalarUniqueID)
|
||||
} else {
|
||||
// retrieve unique plan column id. 1: completely new one, allocating new unique id. 2: registered by projection earlier, using it.
|
||||
if scalarUniqueID, ok = la.SCtx().GetSessionVars().MapHashCode2UniqueID4ExtendedCol[hashCode]; !ok {
|
||||
scalarUniqueID = int(la.SCtx().GetSessionVars().AllocPlanColumnID())
|
||||
}
|
||||
fds.RegisterUniqueID(hashCode, scalarUniqueID)
|
||||
groupByColsUniqueIDs.Insert(scalarUniqueID)
|
||||
}
|
||||
determinants := intset.NewFastIntSet()
|
||||
extractedColumns := expression.ExtractColumns(x)
|
||||
extractedCorColumns := expression.ExtractCorColumns(x)
|
||||
for _, one := range extractedColumns {
|
||||
determinants.Insert(int(one.UniqueID))
|
||||
groupByColsOutputCols.Insert(int(one.UniqueID))
|
||||
}
|
||||
for _, one := range extractedCorColumns {
|
||||
determinants.Insert(int(one.UniqueID))
|
||||
groupByColsOutputCols.Insert(int(one.UniqueID))
|
||||
}
|
||||
notnull := util.IsNullRejected(la.SCtx(), la.Schema(), x)
|
||||
if notnull || determinants.SubsetOf(fds.NotNullCols) {
|
||||
notnullColsUniqueIDs.Insert(scalarUniqueID)
|
||||
}
|
||||
fds.AddStrictFunctionalDependency(determinants, intset.NewFastIntSet(scalarUniqueID))
|
||||
}
|
||||
}
|
||||
|
||||
// Some details:
|
||||
// For now, select max(a) from t group by c, tidb will see `max(a)` as Max aggDes and `a,b,c` as firstRow aggDes,
|
||||
// and keep them all in the schema columns before projection does the pruning. If we build the fake FD eg: {c} ~~> {b}
|
||||
// here since we have seen b as firstRow aggDes, for the upper layer projection of `select max(a), b from t group by c`,
|
||||
// it will take b as valid projection field of group by statement since it has existed in the FD with {c} ~~> {b}.
|
||||
//
|
||||
// and since any_value will NOT be pushed down to agg schema, which means every firstRow aggDes in the agg logical operator
|
||||
// is meaningless to build the FD with. Let's only store the non-firstRow FD down: {group by items} ~~> {real aggDes}
|
||||
realAggFuncUniqueID := intset.NewFastIntSet()
|
||||
for i, aggDes := range la.AggFuncs {
|
||||
if aggDes.Name != "firstrow" {
|
||||
realAggFuncUniqueID.Insert(int(la.Schema().Columns[i].UniqueID))
|
||||
}
|
||||
}
|
||||
|
||||
// apply operator's characteristic's FD setting.
|
||||
if len(la.GroupByItems) == 0 {
|
||||
// 1: as the details shown above, output cols (normal column seen as firstrow) of group by are not validated.
|
||||
// we couldn't merge them as constant FD with origin constant FD together before projection done.
|
||||
// fds.MaxOneRow(outputColsUniqueIDs.Union(groupByColsOutputCols))
|
||||
//
|
||||
// 2: for the convenience of later judgement, when there is no group by items, we will store a FD: {0} -> {real aggDes}
|
||||
// 0 unique id is only used for here.
|
||||
groupByColsUniqueIDs.Insert(0)
|
||||
for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) {
|
||||
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i))
|
||||
}
|
||||
} else {
|
||||
// eliminating input columns that are un-projected.
|
||||
fds.ProjectCols(outputColsUniqueIDs.Union(groupByColsOutputCols).Union(groupByColsUniqueIDs))
|
||||
|
||||
// note: {a} --> {b,c} is not same with {a} --> {b} and {a} --> {c}
|
||||
for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) {
|
||||
// group by phrase always produce strict FD.
|
||||
// 1: it can always distinguish and group the all-null/part-null group column rows.
|
||||
// 2: the rows with all/part null group column are unique row after group operation.
|
||||
// 3: there won't be two same group key with different agg values, so strict FD secured.
|
||||
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i))
|
||||
}
|
||||
|
||||
// agg funcDes has been tag not null flag when building aggregation.
|
||||
fds.MakeNotNull(notnullColsUniqueIDs)
|
||||
}
|
||||
fds.GroupByCols = groupByColsUniqueIDs
|
||||
fds.HasAggBuilt = true
|
||||
// just trace it down in every operator for test checking.
|
||||
la.SetFDs(fds)
|
||||
return fds
|
||||
}
|
||||
|
||||
// GetBaseLogicalPlan inherits BaseLogicalPlan.LogicalPlan.<23rd> implementation.
|
||||
|
||||
// ConvertOuterToInnerJoin inherits BaseLogicalPlan.LogicalPlan.<24th> implementation.
|
||||
|
||||
// *************************** end implementation of logicalPlan interface ***************************
|
||||
|
||||
// HasDistinct shows whether LogicalAggregation has functions with distinct.
|
||||
func (la *LogicalAggregation) HasDistinct() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if aggFunc.HasDistinct {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// HasOrderBy shows whether LogicalAggregation has functions with order-by items.
|
||||
func (la *LogicalAggregation) HasOrderBy() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if len(aggFunc.OrderByItems) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// CopyAggHints copies the aggHints from another LogicalAggregation.
|
||||
func (la *LogicalAggregation) CopyAggHints(agg *LogicalAggregation) {
|
||||
// TODO: Copy the hint may make the un-applicable hint throw the
|
||||
// same warning message more than once. We'd better add a flag for
|
||||
// `HaveThrownWarningMessage` to avoid this. Besides, finalAgg and
|
||||
// partialAgg (in cascades planner) should share the same hint, instead
|
||||
// of a copy.
|
||||
la.PreferAggType = agg.PreferAggType
|
||||
la.PreferAggToCop = agg.PreferAggToCop
|
||||
}
|
||||
|
||||
// IsPartialModeAgg returns if all of the AggFuncs are partialMode.
|
||||
func (la *LogicalAggregation) IsPartialModeAgg() bool {
|
||||
// Since all of the AggFunc share the same AggMode, we only need to check the first one.
|
||||
return la.AggFuncs[0].Mode == aggregation.Partial1Mode
|
||||
}
|
||||
|
||||
// IsCompleteModeAgg returns if all of the AggFuncs are CompleteMode.
|
||||
func (la *LogicalAggregation) IsCompleteModeAgg() bool {
|
||||
// Since all of the AggFunc share the same AggMode, we only need to check the first one.
|
||||
return la.AggFuncs[0].Mode == aggregation.CompleteMode
|
||||
}
|
||||
|
||||
// GetGroupByCols returns the columns that are group-by items.
|
||||
// For example, `group by a, b, c+d` will return [a, b].
|
||||
func (la *LogicalAggregation) GetGroupByCols() []*expression.Column {
|
||||
groupByCols := make([]*expression.Column, 0, len(la.GroupByItems))
|
||||
for _, item := range la.GroupByItems {
|
||||
if col, ok := item.(*expression.Column); ok {
|
||||
groupByCols = append(groupByCols, col)
|
||||
}
|
||||
}
|
||||
return groupByCols
|
||||
}
|
||||
|
||||
// GetPotentialPartitionKeys return potential partition keys for aggregation, the potential partition keys are the group by keys
|
||||
func (la *LogicalAggregation) GetPotentialPartitionKeys() []*property.MPPPartitionColumn {
|
||||
groupByCols := make([]*property.MPPPartitionColumn, 0, len(la.GroupByItems))
|
||||
for _, item := range la.GroupByItems {
|
||||
if col, ok := item.(*expression.Column); ok {
|
||||
groupByCols = append(groupByCols, &property.MPPPartitionColumn{
|
||||
Col: col,
|
||||
CollateID: property.GetCollateIDByNameForPartition(col.GetStaticType().GetCollate()),
|
||||
})
|
||||
}
|
||||
}
|
||||
return groupByCols
|
||||
}
|
||||
|
||||
// GetUsedCols extracts all of the Columns used by agg including GroupByItems and AggFuncs.
|
||||
func (la *LogicalAggregation) GetUsedCols() (usedCols []*expression.Column) {
|
||||
for _, groupByItem := range la.GroupByItems {
|
||||
usedCols = append(usedCols, expression.ExtractColumns(groupByItem)...)
|
||||
}
|
||||
for _, aggDesc := range la.AggFuncs {
|
||||
for _, expr := range aggDesc.Args {
|
||||
usedCols = append(usedCols, expression.ExtractColumns(expr)...)
|
||||
}
|
||||
for _, expr := range aggDesc.OrderByItems {
|
||||
usedCols = append(usedCols, expression.ExtractColumns(expr.Expr)...)
|
||||
}
|
||||
}
|
||||
return usedCols
|
||||
}
|
||||
|
||||
// ResetHintIfConflicted resets the PreferAggType if they are conflicted,
|
||||
// and returns the two PreferAggType hints.
|
||||
func (la *LogicalAggregation) ResetHintIfConflicted() (preferHash bool, preferStream bool) {
|
||||
preferHash = (la.PreferAggType & h.PreferHashAgg) > 0
|
||||
preferStream = (la.PreferAggType & h.PreferStreamAgg) > 0
|
||||
if preferHash && preferStream {
|
||||
la.SCtx().GetSessionVars().StmtCtx.SetHintWarning("Optimizer aggregation hints are conflicted")
|
||||
la.PreferAggType = 0
|
||||
preferHash, preferStream = false, false
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DistinctArgsMeetsProperty checks if the distinct args meet the property.
|
||||
func (la *LogicalAggregation) DistinctArgsMeetsProperty() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if aggFunc.HasDistinct {
|
||||
for _, distinctArg := range aggFunc.Args {
|
||||
if !expression.Contains(la.GroupByItems, distinctArg) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// pushDownPredicatesForAggregation split a CNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation.
|
||||
// It would consider the CNF.
|
||||
// For example,
|
||||
// (a > 1 or avg(b) > 1) and (a < 3), and `avg(b) > 1` can't be pushed-down.
|
||||
// Then condsToPush: a < 3, ret: a > 1 or avg(b) > 1
|
||||
func (la *LogicalAggregation) pushDownCNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
subCNFItem := expression.SplitCNFItems(cond)
|
||||
if len(subCNFItem) == 1 {
|
||||
return la.pushDownPredicatesForAggregation(subCNFItem[0], groupByColumns, exprsOriginal)
|
||||
}
|
||||
exprCtx := la.SCtx().GetExprCtx()
|
||||
for _, item := range subCNFItem {
|
||||
condsToPushForItem, retForItem := la.pushDownDNFPredicatesForAggregation(item, groupByColumns, exprsOriginal)
|
||||
if len(condsToPushForItem) > 0 {
|
||||
condsToPush = append(condsToPush, expression.ComposeDNFCondition(exprCtx, condsToPushForItem...))
|
||||
}
|
||||
if len(retForItem) > 0 {
|
||||
ret = append(ret, expression.ComposeDNFCondition(exprCtx, retForItem...))
|
||||
}
|
||||
}
|
||||
return condsToPush, ret
|
||||
}
|
||||
|
||||
// pushDownDNFPredicatesForAggregation split a DNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation.
|
||||
// It would consider the DNF.
|
||||
// For example,
|
||||
// (a > 1 and avg(b) > 1) or (a < 3), and `avg(b) > 1` can't be pushed-down.
|
||||
// Then condsToPush: (a < 3) and (a > 1), ret: (a > 1 and avg(b) > 1) or (a < 3)
|
||||
func (la *LogicalAggregation) pushDownDNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
//nolint: prealloc
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
subDNFItem := expression.SplitDNFItems(cond)
|
||||
if len(subDNFItem) == 1 {
|
||||
return la.pushDownPredicatesForAggregation(subDNFItem[0], groupByColumns, exprsOriginal)
|
||||
}
|
||||
exprCtx := la.SCtx().GetExprCtx()
|
||||
for _, item := range subDNFItem {
|
||||
condsToPushForItem, retForItem := la.pushDownCNFPredicatesForAggregation(item, groupByColumns, exprsOriginal)
|
||||
if len(condsToPushForItem) <= 0 {
|
||||
return nil, []expression.Expression{cond}
|
||||
}
|
||||
condsToPush = append(condsToPush, expression.ComposeCNFCondition(exprCtx, condsToPushForItem...))
|
||||
if len(retForItem) > 0 {
|
||||
ret = append(ret, expression.ComposeCNFCondition(exprCtx, retForItem...))
|
||||
}
|
||||
}
|
||||
if len(ret) == 0 {
|
||||
// All the condition can be pushed down.
|
||||
return []expression.Expression{cond}, nil
|
||||
}
|
||||
dnfPushDownCond := expression.ComposeDNFCondition(exprCtx, condsToPush...)
|
||||
// Some condition can't be pushed down, we need to keep all the condition.
|
||||
return []expression.Expression{dnfPushDownCond}, []expression.Expression{cond}
|
||||
}
|
||||
|
||||
// splitCondForAggregation splits the condition into those who can be pushed and others.
|
||||
func (la *LogicalAggregation) splitCondForAggregation(predicates []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
exprsOriginal := make([]expression.Expression, 0, len(la.AggFuncs))
|
||||
for _, fun := range la.AggFuncs {
|
||||
exprsOriginal = append(exprsOriginal, fun.Args[0])
|
||||
}
|
||||
groupByColumns := expression.NewSchema(la.GetGroupByCols()...)
|
||||
// It's almost the same as pushDownCNFPredicatesForAggregation, except that the condition is a slice.
|
||||
for _, cond := range predicates {
|
||||
subCondsToPush, subRet := la.pushDownDNFPredicatesForAggregation(cond, groupByColumns, exprsOriginal)
|
||||
if len(subCondsToPush) > 0 {
|
||||
condsToPush = append(condsToPush, subCondsToPush...)
|
||||
}
|
||||
if len(subRet) > 0 {
|
||||
ret = append(ret, subRet...)
|
||||
}
|
||||
}
|
||||
return condsToPush, ret
|
||||
}
|
||||
|
||||
// pushDownPredicatesForAggregation split a condition to two parts, can be pushed-down or can not be pushed-down below aggregation.
|
||||
func (la *LogicalAggregation) pushDownPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
switch cond.(type) {
|
||||
case *expression.Constant:
|
||||
condsToPush = append(condsToPush, cond)
|
||||
// Consider SQL list "select sum(b) from t group by a having 1=0". "1=0" is a constant predicate which should be
|
||||
// retained and pushed down at the same time. Because we will get a wrong query result that contains one column
|
||||
// with value 0 rather than an empty query result.
|
||||
ret = append(ret, cond)
|
||||
case *expression.ScalarFunction:
|
||||
extractedCols := expression.ExtractColumns(cond)
|
||||
ok := true
|
||||
for _, col := range extractedCols {
|
||||
if !groupByColumns.Contains(col) {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
newFunc := expression.ColumnSubstitute(la.SCtx().GetExprCtx(), cond, la.Schema(), exprsOriginal)
|
||||
condsToPush = append(condsToPush, newFunc)
|
||||
} else {
|
||||
ret = append(ret, cond)
|
||||
}
|
||||
default:
|
||||
ret = append(ret, cond)
|
||||
}
|
||||
return condsToPush, ret
|
||||
}
|
||||
|
||||
func (la *LogicalAggregation) buildSelfKeyInfo(selfSchema *expression.Schema) {
|
||||
groupByCols := la.GetGroupByCols()
|
||||
if len(groupByCols) == len(la.GroupByItems) && len(la.GroupByItems) > 0 {
|
||||
indices := selfSchema.ColumnsIndices(groupByCols)
|
||||
if indices != nil {
|
||||
newKey := make([]*expression.Column, 0, len(indices))
|
||||
for _, i := range indices {
|
||||
newKey = append(newKey, selfSchema.Columns[i])
|
||||
}
|
||||
selfSchema.Keys = append(selfSchema.Keys, newKey)
|
||||
}
|
||||
}
|
||||
if len(la.GroupByItems) == 0 {
|
||||
la.SetMaxOneRow(true)
|
||||
}
|
||||
}
|
||||
|
||||
// canPullUp checks if an aggregation can be pulled up. An aggregate function like count(*) cannot be pulled up.
|
||||
func (la *LogicalAggregation) canPullUp() bool {
|
||||
if len(la.GroupByItems) > 0 {
|
||||
return false
|
||||
}
|
||||
for _, f := range la.AggFuncs {
|
||||
for _, arg := range f.Args {
|
||||
expr := expression.EvaluateExprWithNull(la.SCtx().GetExprCtx(), la.Children()[0].Schema(), arg)
|
||||
if con, ok := expr.(*expression.Constant); !ok || !con.Value.IsNull() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (*LogicalAggregation) getGroupNDVs(colGroups [][]*expression.Column, childProfile *property.StatsInfo, gbyCols []*expression.Column) []property.GroupNDV {
|
||||
if len(colGroups) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Check if the child profile provides GroupNDV for the GROUP BY columns.
|
||||
// Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b,
|
||||
// but we have no other approaches for the NDV estimation of these cases
|
||||
// except for using the independent assumption, unless we can use stats of expression index.
|
||||
groupNDV := childProfile.GetGroupNDV4Cols(gbyCols)
|
||||
if groupNDV == nil {
|
||||
return nil
|
||||
}
|
||||
return []property.GroupNDV{*groupNDV}
|
||||
}
|
||||
171
pkg/planner/core/logical_initialize.go
Normal file
171
pkg/planner/core/logical_initialize.go
Normal file
@ -0,0 +1,171 @@
|
||||
// Copyright 2024 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package core
|
||||
|
||||
import (
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
|
||||
"github.com/pingcap/tidb/pkg/util/plancodec"
|
||||
)
|
||||
|
||||
// Init initializes LogicalJoin.
|
||||
func (p LogicalJoin) Init(ctx base.PlanContext, offset int) *LogicalJoin {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeJoin, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes DataSource.
|
||||
func (ds DataSource) Init(ctx base.PlanContext, offset int) *DataSource {
|
||||
ds.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDataSource, &ds, offset)
|
||||
return &ds
|
||||
}
|
||||
|
||||
// Init initializes TiKVSingleGather.
|
||||
func (sg TiKVSingleGather) Init(ctx base.PlanContext, offset int) *TiKVSingleGather {
|
||||
sg.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTiKVSingleGather, &sg, offset)
|
||||
return &sg
|
||||
}
|
||||
|
||||
// Init initializes LogicalTableScan.
|
||||
func (ts LogicalTableScan) Init(ctx base.PlanContext, offset int) *LogicalTableScan {
|
||||
ts.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTableScan, &ts, offset)
|
||||
return &ts
|
||||
}
|
||||
|
||||
// Init initializes LogicalIndexScan.
|
||||
func (is LogicalIndexScan) Init(ctx base.PlanContext, offset int) *LogicalIndexScan {
|
||||
is.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeIdxScan, &is, offset)
|
||||
return &is
|
||||
}
|
||||
|
||||
// Init initializes LogicalApply.
|
||||
func (la LogicalApply) Init(ctx base.PlanContext, offset int) *LogicalApply {
|
||||
la.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeApply, &la, offset)
|
||||
return &la
|
||||
}
|
||||
|
||||
// Init initializes LogicalSelection.
|
||||
func (p LogicalSelection) Init(ctx base.PlanContext, qbOffset int) *LogicalSelection {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSel, &p, qbOffset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalUnionScan.
|
||||
func (p LogicalUnionScan) Init(ctx base.PlanContext, qbOffset int) *LogicalUnionScan {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnionScan, &p, qbOffset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalProjection.
|
||||
func (p LogicalProjection) Init(ctx base.PlanContext, qbOffset int) *LogicalProjection {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeProj, &p, qbOffset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalProjection.
|
||||
func (p LogicalExpand) Init(ctx base.PlanContext, offset int) *LogicalExpand {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeExpand, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalUnionAll.
|
||||
func (p LogicalUnionAll) Init(ctx base.PlanContext, offset int) *LogicalUnionAll {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeUnion, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalPartitionUnionAll.
|
||||
func (p LogicalPartitionUnionAll) Init(ctx base.PlanContext, offset int) *LogicalPartitionUnionAll {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypePartitionUnion, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalSort.
|
||||
func (ls LogicalSort) Init(ctx base.PlanContext, offset int) *LogicalSort {
|
||||
ls.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSort, &ls, offset)
|
||||
return &ls
|
||||
}
|
||||
|
||||
// Init initializes LogicalTopN.
|
||||
func (lt LogicalTopN) Init(ctx base.PlanContext, offset int) *LogicalTopN {
|
||||
lt.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeTopN, <, offset)
|
||||
return <
|
||||
}
|
||||
|
||||
// Init initializes LogicalLimit.
|
||||
func (p LogicalLimit) Init(ctx base.PlanContext, offset int) *LogicalLimit {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLimit, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalTableDual.
|
||||
func (p LogicalTableDual) Init(ctx base.PlanContext, offset int) *LogicalTableDual {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeDual, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalMaxOneRow.
|
||||
func (p LogicalMaxOneRow) Init(ctx base.PlanContext, offset int) *LogicalMaxOneRow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMaxOneRow, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalWindow.
|
||||
func (p LogicalWindow) Init(ctx base.PlanContext, offset int) *LogicalWindow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeWindow, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalShow.
|
||||
func (p LogicalShow) Init(ctx base.PlanContext) *LogicalShow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShow, &p, 0)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalShowDDLJobs.
|
||||
func (p LogicalShowDDLJobs) Init(ctx base.PlanContext) *LogicalShowDDLJobs {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeShowDDLJobs, &p, 0)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalLock.
|
||||
func (p LogicalLock) Init(ctx base.PlanContext) *LogicalLock {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeLock, &p, 0)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalMemTable.
|
||||
func (p LogicalMemTable) Init(ctx base.PlanContext, offset int) *LogicalMemTable {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeMemTableScan, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init only assigns type and context.
|
||||
func (p LogicalCTE) Init(ctx base.PlanContext, offset int) *LogicalCTE {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTE, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init only assigns type and context.
|
||||
func (p LogicalCTETable) Init(ctx base.PlanContext, offset int) *LogicalCTETable {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeCTETable, &p, offset)
|
||||
return &p
|
||||
}
|
||||
|
||||
// Init initializes LogicalSequence
|
||||
func (p LogicalSequence) Init(ctx base.PlanContext, offset int) *LogicalSequence {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeSequence, &p, offset)
|
||||
return &p
|
||||
}
|
||||
@ -915,250 +915,6 @@ func (p *LogicalProjection) GetUsedCols() (usedCols []*expression.Column) {
|
||||
return usedCols
|
||||
}
|
||||
|
||||
// LogicalAggregation represents an aggregate plan.
|
||||
type LogicalAggregation struct {
|
||||
logicalop.LogicalSchemaProducer
|
||||
|
||||
AggFuncs []*aggregation.AggFuncDesc
|
||||
GroupByItems []expression.Expression
|
||||
|
||||
// PreferAggType And PreferAggToCop stores aggregation hint information.
|
||||
PreferAggType uint
|
||||
PreferAggToCop bool
|
||||
|
||||
PossibleProperties [][]*expression.Column
|
||||
InputCount float64 // InputCount is the input count of this plan.
|
||||
|
||||
// NoCopPushDown indicates if planner must not push this agg down to coprocessor.
|
||||
// It is true when the agg is in the outer child tree of apply.
|
||||
NoCopPushDown bool
|
||||
}
|
||||
|
||||
// HasDistinct shows whether LogicalAggregation has functions with distinct.
|
||||
func (la *LogicalAggregation) HasDistinct() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if aggFunc.HasDistinct {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// HasOrderBy shows whether LogicalAggregation has functions with order-by items.
|
||||
func (la *LogicalAggregation) HasOrderBy() bool {
|
||||
for _, aggFunc := range la.AggFuncs {
|
||||
if len(aggFunc.OrderByItems) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// ExtractFD implements the logical plan interface, extracting the FD from bottom up.
|
||||
// 1:
|
||||
// In most of the cases, using FDs to check the only_full_group_by problem should be done in the buildAggregation phase
|
||||
// by extracting the bottom-up FDs graph from the `p` --- the sub plan tree that has already been built.
|
||||
//
|
||||
// 2:
|
||||
// and this requires that some conditions push-down into the `p` like selection should be done before building aggregation,
|
||||
// otherwise, 'a=1 and a can occur in the select lists of a group by' will be miss-checked because it doesn't be implied in the known FDs graph.
|
||||
//
|
||||
// 3:
|
||||
// when a logical agg is built, it's schema columns indicates what the permitted-non-agg columns is. Therefore, we shouldn't
|
||||
// depend on logicalAgg.ExtractFD() to finish the only_full_group_by checking problem rather than by 1 & 2.
|
||||
func (la *LogicalAggregation) ExtractFD() *fd.FDSet {
|
||||
// basically extract the children's fdSet.
|
||||
fds := la.LogicalSchemaProducer.ExtractFD()
|
||||
// collect the output columns' unique ID.
|
||||
outputColsUniqueIDs := intset.NewFastIntSet()
|
||||
notnullColsUniqueIDs := intset.NewFastIntSet()
|
||||
groupByColsUniqueIDs := intset.NewFastIntSet()
|
||||
groupByColsOutputCols := intset.NewFastIntSet()
|
||||
// Since the aggregation is build ahead of projection, the latter one will reuse the column with UniqueID allocated in aggregation
|
||||
// via aggMapper, so we don't need unnecessarily maintain the <aggDes, UniqueID> mapping in the FDSet like expr did, just treating
|
||||
// it as normal column.
|
||||
for _, one := range la.Schema().Columns {
|
||||
outputColsUniqueIDs.Insert(int(one.UniqueID))
|
||||
}
|
||||
// For one like sum(a), we don't need to build functional dependency from a --> sum(a), cause it's only determined by the
|
||||
// group-by-item (group-by-item --> sum(a)).
|
||||
for _, expr := range la.GroupByItems {
|
||||
switch x := expr.(type) {
|
||||
case *expression.Column:
|
||||
groupByColsUniqueIDs.Insert(int(x.UniqueID))
|
||||
case *expression.CorrelatedColumn:
|
||||
// shouldn't be here, intercepted by plan builder as unknown column.
|
||||
continue
|
||||
case *expression.Constant:
|
||||
// shouldn't be here, interpreted as pos param by plan builder.
|
||||
continue
|
||||
case *expression.ScalarFunction:
|
||||
hashCode := string(x.HashCode())
|
||||
var (
|
||||
ok bool
|
||||
scalarUniqueID int
|
||||
)
|
||||
if scalarUniqueID, ok = fds.IsHashCodeRegistered(hashCode); ok {
|
||||
groupByColsUniqueIDs.Insert(scalarUniqueID)
|
||||
} else {
|
||||
// retrieve unique plan column id. 1: completely new one, allocating new unique id. 2: registered by projection earlier, using it.
|
||||
if scalarUniqueID, ok = la.SCtx().GetSessionVars().MapHashCode2UniqueID4ExtendedCol[hashCode]; !ok {
|
||||
scalarUniqueID = int(la.SCtx().GetSessionVars().AllocPlanColumnID())
|
||||
}
|
||||
fds.RegisterUniqueID(hashCode, scalarUniqueID)
|
||||
groupByColsUniqueIDs.Insert(scalarUniqueID)
|
||||
}
|
||||
determinants := intset.NewFastIntSet()
|
||||
extractedColumns := expression.ExtractColumns(x)
|
||||
extractedCorColumns := expression.ExtractCorColumns(x)
|
||||
for _, one := range extractedColumns {
|
||||
determinants.Insert(int(one.UniqueID))
|
||||
groupByColsOutputCols.Insert(int(one.UniqueID))
|
||||
}
|
||||
for _, one := range extractedCorColumns {
|
||||
determinants.Insert(int(one.UniqueID))
|
||||
groupByColsOutputCols.Insert(int(one.UniqueID))
|
||||
}
|
||||
notnull := util.IsNullRejected(la.SCtx(), la.Schema(), x)
|
||||
if notnull || determinants.SubsetOf(fds.NotNullCols) {
|
||||
notnullColsUniqueIDs.Insert(scalarUniqueID)
|
||||
}
|
||||
fds.AddStrictFunctionalDependency(determinants, intset.NewFastIntSet(scalarUniqueID))
|
||||
}
|
||||
}
|
||||
|
||||
// Some details:
|
||||
// For now, select max(a) from t group by c, tidb will see `max(a)` as Max aggDes and `a,b,c` as firstRow aggDes,
|
||||
// and keep them all in the schema columns before projection does the pruning. If we build the fake FD eg: {c} ~~> {b}
|
||||
// here since we have seen b as firstRow aggDes, for the upper layer projection of `select max(a), b from t group by c`,
|
||||
// it will take b as valid projection field of group by statement since it has existed in the FD with {c} ~~> {b}.
|
||||
//
|
||||
// and since any_value will NOT be pushed down to agg schema, which means every firstRow aggDes in the agg logical operator
|
||||
// is meaningless to build the FD with. Let's only store the non-firstRow FD down: {group by items} ~~> {real aggDes}
|
||||
realAggFuncUniqueID := intset.NewFastIntSet()
|
||||
for i, aggDes := range la.AggFuncs {
|
||||
if aggDes.Name != "firstrow" {
|
||||
realAggFuncUniqueID.Insert(int(la.Schema().Columns[i].UniqueID))
|
||||
}
|
||||
}
|
||||
|
||||
// apply operator's characteristic's FD setting.
|
||||
if len(la.GroupByItems) == 0 {
|
||||
// 1: as the details shown above, output cols (normal column seen as firstrow) of group by are not validated.
|
||||
// we couldn't merge them as constant FD with origin constant FD together before projection done.
|
||||
// fds.MaxOneRow(outputColsUniqueIDs.Union(groupByColsOutputCols))
|
||||
//
|
||||
// 2: for the convenience of later judgement, when there is no group by items, we will store a FD: {0} -> {real aggDes}
|
||||
// 0 unique id is only used for here.
|
||||
groupByColsUniqueIDs.Insert(0)
|
||||
for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) {
|
||||
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i))
|
||||
}
|
||||
} else {
|
||||
// eliminating input columns that are un-projected.
|
||||
fds.ProjectCols(outputColsUniqueIDs.Union(groupByColsOutputCols).Union(groupByColsUniqueIDs))
|
||||
|
||||
// note: {a} --> {b,c} is not same with {a} --> {b} and {a} --> {c}
|
||||
for i, ok := realAggFuncUniqueID.Next(0); ok; i, ok = realAggFuncUniqueID.Next(i + 1) {
|
||||
// group by phrase always produce strict FD.
|
||||
// 1: it can always distinguish and group the all-null/part-null group column rows.
|
||||
// 2: the rows with all/part null group column are unique row after group operation.
|
||||
// 3: there won't be two same group key with different agg values, so strict FD secured.
|
||||
fds.AddStrictFunctionalDependency(groupByColsUniqueIDs, intset.NewFastIntSet(i))
|
||||
}
|
||||
|
||||
// agg funcDes has been tag not null flag when building aggregation.
|
||||
fds.MakeNotNull(notnullColsUniqueIDs)
|
||||
}
|
||||
fds.GroupByCols = groupByColsUniqueIDs
|
||||
fds.HasAggBuilt = true
|
||||
// just trace it down in every operator for test checking.
|
||||
la.SetFDs(fds)
|
||||
return fds
|
||||
}
|
||||
|
||||
// CopyAggHints copies the aggHints from another LogicalAggregation.
|
||||
func (la *LogicalAggregation) CopyAggHints(agg *LogicalAggregation) {
|
||||
// TODO: Copy the hint may make the un-applicable hint throw the
|
||||
// same warning message more than once. We'd better add a flag for
|
||||
// `HaveThrownWarningMessage` to avoid this. Besides, finalAgg and
|
||||
// partialAgg (in cascades planner) should share the same hint, instead
|
||||
// of a copy.
|
||||
la.PreferAggType = agg.PreferAggType
|
||||
la.PreferAggToCop = agg.PreferAggToCop
|
||||
}
|
||||
|
||||
// IsPartialModeAgg returns if all of the AggFuncs are partialMode.
|
||||
func (la *LogicalAggregation) IsPartialModeAgg() bool {
|
||||
// Since all of the AggFunc share the same AggMode, we only need to check the first one.
|
||||
return la.AggFuncs[0].Mode == aggregation.Partial1Mode
|
||||
}
|
||||
|
||||
// IsCompleteModeAgg returns if all of the AggFuncs are CompleteMode.
|
||||
func (la *LogicalAggregation) IsCompleteModeAgg() bool {
|
||||
// Since all of the AggFunc share the same AggMode, we only need to check the first one.
|
||||
return la.AggFuncs[0].Mode == aggregation.CompleteMode
|
||||
}
|
||||
|
||||
// GetGroupByCols returns the columns that are group-by items.
|
||||
// For example, `group by a, b, c+d` will return [a, b].
|
||||
func (la *LogicalAggregation) GetGroupByCols() []*expression.Column {
|
||||
groupByCols := make([]*expression.Column, 0, len(la.GroupByItems))
|
||||
for _, item := range la.GroupByItems {
|
||||
if col, ok := item.(*expression.Column); ok {
|
||||
groupByCols = append(groupByCols, col)
|
||||
}
|
||||
}
|
||||
return groupByCols
|
||||
}
|
||||
|
||||
// GetPotentialPartitionKeys return potential partition keys for aggregation, the potential partition keys are the group by keys
|
||||
func (la *LogicalAggregation) GetPotentialPartitionKeys() []*property.MPPPartitionColumn {
|
||||
groupByCols := make([]*property.MPPPartitionColumn, 0, len(la.GroupByItems))
|
||||
for _, item := range la.GroupByItems {
|
||||
if col, ok := item.(*expression.Column); ok {
|
||||
groupByCols = append(groupByCols, &property.MPPPartitionColumn{
|
||||
Col: col,
|
||||
CollateID: property.GetCollateIDByNameForPartition(col.GetStaticType().GetCollate()),
|
||||
})
|
||||
}
|
||||
}
|
||||
return groupByCols
|
||||
}
|
||||
|
||||
// ExtractCorrelatedCols implements LogicalPlan interface.
|
||||
func (la *LogicalAggregation) ExtractCorrelatedCols() []*expression.CorrelatedColumn {
|
||||
corCols := make([]*expression.CorrelatedColumn, 0, len(la.GroupByItems)+len(la.AggFuncs))
|
||||
for _, expr := range la.GroupByItems {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(expr)...)
|
||||
}
|
||||
for _, fun := range la.AggFuncs {
|
||||
for _, arg := range fun.Args {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(arg)...)
|
||||
}
|
||||
for _, arg := range fun.OrderByItems {
|
||||
corCols = append(corCols, expression.ExtractCorColumns(arg.Expr)...)
|
||||
}
|
||||
}
|
||||
return corCols
|
||||
}
|
||||
|
||||
// GetUsedCols extracts all of the Columns used by agg including GroupByItems and AggFuncs.
|
||||
func (la *LogicalAggregation) GetUsedCols() (usedCols []*expression.Column) {
|
||||
for _, groupByItem := range la.GroupByItems {
|
||||
usedCols = append(usedCols, expression.ExtractColumns(groupByItem)...)
|
||||
}
|
||||
for _, aggDesc := range la.AggFuncs {
|
||||
for _, expr := range aggDesc.Args {
|
||||
usedCols = append(usedCols, expression.ExtractColumns(expr)...)
|
||||
}
|
||||
for _, expr := range aggDesc.OrderByItems {
|
||||
usedCols = append(usedCols, expression.ExtractColumns(expr.Expr)...)
|
||||
}
|
||||
}
|
||||
return usedCols
|
||||
}
|
||||
|
||||
// LogicalSelection represents a where or having predicate.
|
||||
type LogicalSelection struct {
|
||||
logicalop.BaseLogicalPlan
|
||||
|
||||
@ -186,25 +186,3 @@ func (p *LogicalJoin) PreparePossibleProperties(_ *expression.Schema, childrenPr
|
||||
}
|
||||
return resultProperties
|
||||
}
|
||||
|
||||
// PreparePossibleProperties implements base.LogicalPlan PreparePossibleProperties interface.
|
||||
func (la *LogicalAggregation) PreparePossibleProperties(_ *expression.Schema, childrenProperties ...[][]*expression.Column) [][]*expression.Column {
|
||||
childProps := childrenProperties[0]
|
||||
// If there's no group-by item, the stream aggregation could have no order property. So we can add an empty property
|
||||
// when its group-by item is empty.
|
||||
if len(la.GroupByItems) == 0 {
|
||||
la.PossibleProperties = [][]*expression.Column{nil}
|
||||
return nil
|
||||
}
|
||||
resultProperties := make([][]*expression.Column, 0, len(childProps))
|
||||
groupByCols := la.GetGroupByCols()
|
||||
for _, possibleChildProperty := range childProps {
|
||||
sortColOffsets := getMaxSortPrefix(possibleChildProperty, groupByCols)
|
||||
if len(sortColOffsets) == len(groupByCols) {
|
||||
prop := possibleChildProperty[:len(groupByCols)]
|
||||
resultProperties = append(resultProperties, prop)
|
||||
}
|
||||
}
|
||||
la.PossibleProperties = resultProperties
|
||||
return resultProperties
|
||||
}
|
||||
|
||||
@ -565,7 +565,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz
|
||||
break
|
||||
}
|
||||
newGbyItems = append(newGbyItems, groupBy)
|
||||
if ExprsHasSideEffects(newGbyItems) {
|
||||
if expression.ExprsHasSideEffects(newGbyItems) {
|
||||
noSideEffects = false
|
||||
break
|
||||
}
|
||||
@ -586,7 +586,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz
|
||||
}
|
||||
newArgs = append(newArgs, newArg)
|
||||
}
|
||||
if ExprsHasSideEffects(newArgs) {
|
||||
if expression.ExprsHasSideEffects(newArgs) {
|
||||
noSideEffects = false
|
||||
break
|
||||
}
|
||||
@ -603,7 +603,7 @@ func (a *aggregationPushDownSolver) aggPushDown(p base.LogicalPlan, opt *optimiz
|
||||
}
|
||||
newOrderByItems = append(newOrderByItems, byItem)
|
||||
}
|
||||
if ExprsHasSideEffects(newOrderByItems) {
|
||||
if expression.ExprsHasSideEffects(newOrderByItems) {
|
||||
noSideEffects = false
|
||||
break
|
||||
}
|
||||
|
||||
@ -45,36 +45,6 @@ func buildKeyInfo(lp base.LogicalPlan) {
|
||||
lp.BuildKeyInfo(lp.Schema(), childSchema)
|
||||
}
|
||||
|
||||
// BuildKeyInfo implements base.LogicalPlan BuildKeyInfo interface.
|
||||
func (la *LogicalAggregation) BuildKeyInfo(selfSchema *expression.Schema, childSchema []*expression.Schema) {
|
||||
// According to the issue#46962, we can ignore the judgment of partial agg
|
||||
// Sometimes, the agg inside of subquery and there is a true condition in where clause, the agg function is empty.
|
||||
// For example, ``` select xxxx from xxx WHERE TRUE = ALL ( SELECT TRUE GROUP BY 1 LIMIT 1 ) IS NULL IS NOT NULL;
|
||||
// In this case, the agg is complete mode and we can ignore this check.
|
||||
if len(la.AggFuncs) != 0 && la.IsPartialModeAgg() {
|
||||
return
|
||||
}
|
||||
la.LogicalSchemaProducer.BuildKeyInfo(selfSchema, childSchema)
|
||||
la.buildSelfKeyInfo(selfSchema)
|
||||
}
|
||||
|
||||
func (la *LogicalAggregation) buildSelfKeyInfo(selfSchema *expression.Schema) {
|
||||
groupByCols := la.GetGroupByCols()
|
||||
if len(groupByCols) == len(la.GroupByItems) && len(la.GroupByItems) > 0 {
|
||||
indices := selfSchema.ColumnsIndices(groupByCols)
|
||||
if indices != nil {
|
||||
newKey := make([]*expression.Column, 0, len(indices))
|
||||
for _, i := range indices {
|
||||
newKey = append(newKey, selfSchema.Columns[i])
|
||||
}
|
||||
selfSchema.Keys = append(selfSchema.Keys, newKey)
|
||||
}
|
||||
}
|
||||
if len(la.GroupByItems) == 0 {
|
||||
la.SetMaxOneRow(true)
|
||||
}
|
||||
}
|
||||
|
||||
// If a condition is the form of (uniqueKey = constant) or (uniqueKey = Correlated column), it returns at most one row.
|
||||
// This function will check it.
|
||||
func checkMaxOneRowCond(eqColIDs map[int64]struct{}, childSchema *expression.Schema) bool {
|
||||
|
||||
@ -19,9 +19,7 @@ import (
|
||||
"slices"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/expression"
|
||||
"github.com/pingcap/tidb/pkg/expression/aggregation"
|
||||
"github.com/pingcap/tidb/pkg/infoschema"
|
||||
"github.com/pingcap/tidb/pkg/parser/ast"
|
||||
"github.com/pingcap/tidb/pkg/parser/model"
|
||||
"github.com/pingcap/tidb/pkg/parser/mysql"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
@ -44,33 +42,6 @@ func (*columnPruner) optimize(_ context.Context, lp base.LogicalPlan, opt *optim
|
||||
return lp, planChanged, nil
|
||||
}
|
||||
|
||||
// ExprsHasSideEffects checks if any of the expressions has side effects.
|
||||
func ExprsHasSideEffects(exprs []expression.Expression) bool {
|
||||
for _, expr := range exprs {
|
||||
if exprHasSetVarOrSleep(expr) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// exprHasSetVarOrSleep checks if the expression has SetVar function or Sleep function.
|
||||
func exprHasSetVarOrSleep(expr expression.Expression) bool {
|
||||
scalaFunc, isScalaFunc := expr.(*expression.ScalarFunction)
|
||||
if !isScalaFunc {
|
||||
return false
|
||||
}
|
||||
if scalaFunc.FuncName.L == ast.SetVar || scalaFunc.FuncName.L == ast.Sleep {
|
||||
return true
|
||||
}
|
||||
for _, arg := range scalaFunc.GetArgs() {
|
||||
if exprHasSetVarOrSleep(arg) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// PruneColumns implement the Expand OP's column pruning logic.
|
||||
// logicExpand is built in the logical plan building phase, where all the column prune is not done yet. So the
|
||||
// expand projection expressions is meaningless if it built at that time. (we only maintain its schema, while
|
||||
@ -108,7 +79,7 @@ func (p *LogicalProjection) PruneColumns(parentUsedCols []*expression.Column, op
|
||||
|
||||
// for implicit projected cols, once the ancestor doesn't use it, the implicit expr will be automatically pruned here.
|
||||
for i := len(used) - 1; i >= 0; i-- {
|
||||
if !used[i] && !exprHasSetVarOrSleep(p.Exprs[i]) {
|
||||
if !used[i] && !expression.ExprHasSetVarOrSleep(p.Exprs[i]) {
|
||||
prunedColumns = append(prunedColumns, p.Schema().Columns[i])
|
||||
p.Schema().Columns = append(p.Schema().Columns[:i], p.Schema().Columns[i+1:]...)
|
||||
p.Exprs = append(p.Exprs[:i], p.Exprs[i+1:]...)
|
||||
@ -138,100 +109,6 @@ func (p *LogicalSelection) PruneColumns(parentUsedCols []*expression.Column, opt
|
||||
return p, nil
|
||||
}
|
||||
|
||||
// PruneColumns implements base.LogicalPlan interface.
|
||||
func (la *LogicalAggregation) PruneColumns(parentUsedCols []*expression.Column, opt *optimizetrace.LogicalOptimizeOp) (base.LogicalPlan, error) {
|
||||
child := la.Children()[0]
|
||||
used := expression.GetUsedList(la.SCtx().GetExprCtx().GetEvalCtx(), parentUsedCols, la.Schema())
|
||||
prunedColumns := make([]*expression.Column, 0)
|
||||
prunedFunctions := make([]*aggregation.AggFuncDesc, 0)
|
||||
prunedGroupByItems := make([]expression.Expression, 0)
|
||||
|
||||
allFirstRow := true
|
||||
allRemainFirstRow := true
|
||||
for i := len(used) - 1; i >= 0; i-- {
|
||||
if la.AggFuncs[i].Name != ast.AggFuncFirstRow {
|
||||
allFirstRow = false
|
||||
}
|
||||
if !used[i] && !ExprsHasSideEffects(la.AggFuncs[i].Args) {
|
||||
prunedColumns = append(prunedColumns, la.Schema().Columns[i])
|
||||
prunedFunctions = append(prunedFunctions, la.AggFuncs[i])
|
||||
la.Schema().Columns = append(la.Schema().Columns[:i], la.Schema().Columns[i+1:]...)
|
||||
la.AggFuncs = append(la.AggFuncs[:i], la.AggFuncs[i+1:]...)
|
||||
} else if la.AggFuncs[i].Name != ast.AggFuncFirstRow {
|
||||
allRemainFirstRow = false
|
||||
}
|
||||
}
|
||||
logicaltrace.AppendColumnPruneTraceStep(la, prunedColumns, opt)
|
||||
logicaltrace.AppendFunctionPruneTraceStep(la, prunedFunctions, opt)
|
||||
//nolint: prealloc
|
||||
var selfUsedCols []*expression.Column
|
||||
for _, aggrFunc := range la.AggFuncs {
|
||||
selfUsedCols = expression.ExtractColumnsFromExpressions(selfUsedCols, aggrFunc.Args, nil)
|
||||
|
||||
var cols []*expression.Column
|
||||
aggrFunc.OrderByItems, cols = pruneByItems(la, aggrFunc.OrderByItems, opt)
|
||||
selfUsedCols = append(selfUsedCols, cols...)
|
||||
}
|
||||
if len(la.AggFuncs) == 0 || (!allFirstRow && allRemainFirstRow) {
|
||||
// If all the aggregate functions are pruned, we should add an aggregate function to maintain the info of row numbers.
|
||||
// For all the aggregate functions except `first_row`, if we have an empty table defined as t(a,b),
|
||||
// `select agg(a) from t` would always return one row, while `select agg(a) from t group by b` would return empty.
|
||||
// For `first_row` which is only used internally by tidb, `first_row(a)` would always return empty for empty input now.
|
||||
var err error
|
||||
var newAgg *aggregation.AggFuncDesc
|
||||
if allFirstRow {
|
||||
newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncFirstRow, []expression.Expression{expression.NewOne()}, false)
|
||||
} else {
|
||||
newAgg, err = aggregation.NewAggFuncDesc(la.SCtx().GetExprCtx(), ast.AggFuncCount, []expression.Expression{expression.NewOne()}, false)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
la.AggFuncs = append(la.AggFuncs, newAgg)
|
||||
col := &expression.Column{
|
||||
UniqueID: la.SCtx().GetSessionVars().AllocPlanColumnID(),
|
||||
RetType: newAgg.RetTp,
|
||||
}
|
||||
la.Schema().Columns = append(la.Schema().Columns, col)
|
||||
}
|
||||
|
||||
if len(la.GroupByItems) > 0 {
|
||||
for i := len(la.GroupByItems) - 1; i >= 0; i-- {
|
||||
cols := expression.ExtractColumns(la.GroupByItems[i])
|
||||
if len(cols) == 0 && !exprHasSetVarOrSleep(la.GroupByItems[i]) {
|
||||
prunedGroupByItems = append(prunedGroupByItems, la.GroupByItems[i])
|
||||
la.GroupByItems = append(la.GroupByItems[:i], la.GroupByItems[i+1:]...)
|
||||
} else {
|
||||
selfUsedCols = append(selfUsedCols, cols...)
|
||||
}
|
||||
}
|
||||
// If all the group by items are pruned, we should add a constant 1 to keep the correctness.
|
||||
// Because `select count(*) from t` is different from `select count(*) from t group by 1`.
|
||||
if len(la.GroupByItems) == 0 {
|
||||
la.GroupByItems = []expression.Expression{expression.NewOne()}
|
||||
}
|
||||
}
|
||||
logicaltrace.AppendGroupByItemsPruneTraceStep(la, prunedGroupByItems, opt)
|
||||
var err error
|
||||
la.Children()[0], err = child.PruneColumns(selfUsedCols, opt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// update children[0]
|
||||
child = la.Children()[0]
|
||||
// Do an extra Projection Elimination here. This is specially for empty Projection below Aggregation.
|
||||
// This kind of Projection would cause some bugs for MPP plan and is safe to be removed.
|
||||
// This kind of Projection should be removed in Projection Elimination, but currently PrunColumnsAgain is
|
||||
// the last rule. So we specially handle this case here.
|
||||
if childProjection, isProjection := child.(*LogicalProjection); isProjection {
|
||||
if len(childProjection.Exprs) == 0 && childProjection.Schema().Len() == 0 {
|
||||
childOfChild := childProjection.Children()[0]
|
||||
la.SetChildren(childOfChild)
|
||||
}
|
||||
}
|
||||
return la, nil
|
||||
}
|
||||
|
||||
func pruneByItems(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (byItems []*util.ByItems,
|
||||
parentUsedCols []*expression.Column) {
|
||||
prunedByItems := make([]*util.ByItems, 0)
|
||||
|
||||
@ -42,22 +42,6 @@ func (la *LogicalApply) canPullUpAgg() bool {
|
||||
return len(la.Children()[0].Schema().Keys) > 0
|
||||
}
|
||||
|
||||
// canPullUp checks if an aggregation can be pulled up. An aggregate function like count(*) cannot be pulled up.
|
||||
func (la *LogicalAggregation) canPullUp() bool {
|
||||
if len(la.GroupByItems) > 0 {
|
||||
return false
|
||||
}
|
||||
for _, f := range la.AggFuncs {
|
||||
for _, arg := range f.Args {
|
||||
expr := expression.EvaluateExprWithNull(la.SCtx().GetExprCtx(), la.Children()[0].Schema(), arg)
|
||||
if con, ok := expr.(*expression.Constant); !ok || !con.Value.IsNull() {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// deCorColFromEqExpr checks whether it's an equal condition of form `col = correlated col`. If so we will change the decorrelated
|
||||
// column to normal column to make a new equal condition.
|
||||
func (la *LogicalApply) deCorColFromEqExpr(expr expression.Expression) expression.Expression {
|
||||
|
||||
@ -212,7 +212,7 @@ func (pe *projectionEliminator) eliminate(p base.LogicalPlan, replace map[string
|
||||
|
||||
// eliminate duplicate projection: projection with child projection
|
||||
if isProj {
|
||||
if child, ok := p.Children()[0].(*LogicalProjection); ok && !ExprsHasSideEffects(child.Exprs) {
|
||||
if child, ok := p.Children()[0].(*LogicalProjection); ok && !expression.ExprsHasSideEffects(child.Exprs) {
|
||||
ctx := p.SCtx()
|
||||
for i := range proj.Exprs {
|
||||
proj.Exprs[i] = ReplaceColumnOfExpr(proj.Exprs[i], child, child.Schema())
|
||||
@ -276,21 +276,6 @@ func (p *LogicalProjection) ReplaceExprColumns(replace map[string]*expression.Co
|
||||
}
|
||||
}
|
||||
|
||||
// ReplaceExprColumns implements base.LogicalPlan interface.
|
||||
func (la *LogicalAggregation) ReplaceExprColumns(replace map[string]*expression.Column) {
|
||||
for _, agg := range la.AggFuncs {
|
||||
for _, aggExpr := range agg.Args {
|
||||
ResolveExprAndReplace(aggExpr, replace)
|
||||
}
|
||||
for _, orderExpr := range agg.OrderByItems {
|
||||
ResolveExprAndReplace(orderExpr.Expr, replace)
|
||||
}
|
||||
}
|
||||
for _, gbyItem := range la.GroupByItems {
|
||||
ResolveExprAndReplace(gbyItem, replace)
|
||||
}
|
||||
}
|
||||
|
||||
// ReplaceExprColumns implements base.LogicalPlan interface.
|
||||
func (p *LogicalSelection) ReplaceExprColumns(replace map[string]*expression.Column) {
|
||||
for _, expr := range p.Conditions {
|
||||
|
||||
@ -431,125 +431,6 @@ func (p *LogicalUnionAll) PredicatePushDown(predicates []expression.Expression,
|
||||
return nil, p
|
||||
}
|
||||
|
||||
// pushDownPredicatesForAggregation split a condition to two parts, can be pushed-down or can not be pushed-down below aggregation.
|
||||
func (la *LogicalAggregation) pushDownPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
switch cond.(type) {
|
||||
case *expression.Constant:
|
||||
condsToPush = append(condsToPush, cond)
|
||||
// Consider SQL list "select sum(b) from t group by a having 1=0". "1=0" is a constant predicate which should be
|
||||
// retained and pushed down at the same time. Because we will get a wrong query result that contains one column
|
||||
// with value 0 rather than an empty query result.
|
||||
ret = append(ret, cond)
|
||||
case *expression.ScalarFunction:
|
||||
extractedCols := expression.ExtractColumns(cond)
|
||||
ok := true
|
||||
for _, col := range extractedCols {
|
||||
if !groupByColumns.Contains(col) {
|
||||
ok = false
|
||||
break
|
||||
}
|
||||
}
|
||||
if ok {
|
||||
newFunc := expression.ColumnSubstitute(la.SCtx().GetExprCtx(), cond, la.Schema(), exprsOriginal)
|
||||
condsToPush = append(condsToPush, newFunc)
|
||||
} else {
|
||||
ret = append(ret, cond)
|
||||
}
|
||||
default:
|
||||
ret = append(ret, cond)
|
||||
}
|
||||
return condsToPush, ret
|
||||
}
|
||||
|
||||
// pushDownPredicatesForAggregation split a CNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation.
|
||||
// It would consider the CNF.
|
||||
// For example,
|
||||
// (a > 1 or avg(b) > 1) and (a < 3), and `avg(b) > 1` can't be pushed-down.
|
||||
// Then condsToPush: a < 3, ret: a > 1 or avg(b) > 1
|
||||
func (la *LogicalAggregation) pushDownCNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
subCNFItem := expression.SplitCNFItems(cond)
|
||||
if len(subCNFItem) == 1 {
|
||||
return la.pushDownPredicatesForAggregation(subCNFItem[0], groupByColumns, exprsOriginal)
|
||||
}
|
||||
exprCtx := la.SCtx().GetExprCtx()
|
||||
for _, item := range subCNFItem {
|
||||
condsToPushForItem, retForItem := la.pushDownDNFPredicatesForAggregation(item, groupByColumns, exprsOriginal)
|
||||
if len(condsToPushForItem) > 0 {
|
||||
condsToPush = append(condsToPush, expression.ComposeDNFCondition(exprCtx, condsToPushForItem...))
|
||||
}
|
||||
if len(retForItem) > 0 {
|
||||
ret = append(ret, expression.ComposeDNFCondition(exprCtx, retForItem...))
|
||||
}
|
||||
}
|
||||
return condsToPush, ret
|
||||
}
|
||||
|
||||
// pushDownDNFPredicatesForAggregation split a DNF condition to two parts, can be pushed-down or can not be pushed-down below aggregation.
|
||||
// It would consider the DNF.
|
||||
// For example,
|
||||
// (a > 1 and avg(b) > 1) or (a < 3), and `avg(b) > 1` can't be pushed-down.
|
||||
// Then condsToPush: (a < 3) and (a > 1), ret: (a > 1 and avg(b) > 1) or (a < 3)
|
||||
func (la *LogicalAggregation) pushDownDNFPredicatesForAggregation(cond expression.Expression, groupByColumns *expression.Schema, exprsOriginal []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
//nolint: prealloc
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
subDNFItem := expression.SplitDNFItems(cond)
|
||||
if len(subDNFItem) == 1 {
|
||||
return la.pushDownPredicatesForAggregation(subDNFItem[0], groupByColumns, exprsOriginal)
|
||||
}
|
||||
exprCtx := la.SCtx().GetExprCtx()
|
||||
for _, item := range subDNFItem {
|
||||
condsToPushForItem, retForItem := la.pushDownCNFPredicatesForAggregation(item, groupByColumns, exprsOriginal)
|
||||
if len(condsToPushForItem) <= 0 {
|
||||
return nil, []expression.Expression{cond}
|
||||
}
|
||||
condsToPush = append(condsToPush, expression.ComposeCNFCondition(exprCtx, condsToPushForItem...))
|
||||
if len(retForItem) > 0 {
|
||||
ret = append(ret, expression.ComposeCNFCondition(exprCtx, retForItem...))
|
||||
}
|
||||
}
|
||||
if len(ret) == 0 {
|
||||
// All the condition can be pushed down.
|
||||
return []expression.Expression{cond}, nil
|
||||
}
|
||||
dnfPushDownCond := expression.ComposeDNFCondition(exprCtx, condsToPush...)
|
||||
// Some condition can't be pushed down, we need to keep all the condition.
|
||||
return []expression.Expression{dnfPushDownCond}, []expression.Expression{cond}
|
||||
}
|
||||
|
||||
// splitCondForAggregation splits the condition into those who can be pushed and others.
|
||||
func (la *LogicalAggregation) splitCondForAggregation(predicates []expression.Expression) ([]expression.Expression, []expression.Expression) {
|
||||
var condsToPush []expression.Expression
|
||||
var ret []expression.Expression
|
||||
exprsOriginal := make([]expression.Expression, 0, len(la.AggFuncs))
|
||||
for _, fun := range la.AggFuncs {
|
||||
exprsOriginal = append(exprsOriginal, fun.Args[0])
|
||||
}
|
||||
groupByColumns := expression.NewSchema(la.GetGroupByCols()...)
|
||||
// It's almost the same as pushDownCNFPredicatesForAggregation, except that the condition is a slice.
|
||||
for _, cond := range predicates {
|
||||
subCondsToPush, subRet := la.pushDownDNFPredicatesForAggregation(cond, groupByColumns, exprsOriginal)
|
||||
if len(subCondsToPush) > 0 {
|
||||
condsToPush = append(condsToPush, subCondsToPush...)
|
||||
}
|
||||
if len(subRet) > 0 {
|
||||
ret = append(ret, subRet...)
|
||||
}
|
||||
}
|
||||
return condsToPush, ret
|
||||
}
|
||||
|
||||
// PredicatePushDown implements base.LogicalPlan PredicatePushDown interface.
|
||||
func (la *LogicalAggregation) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
|
||||
condsToPush, ret := la.splitCondForAggregation(predicates)
|
||||
la.BaseLogicalPlan.PredicatePushDown(condsToPush, opt)
|
||||
return ret, la
|
||||
}
|
||||
|
||||
// PredicatePushDown implements base.LogicalPlan PredicatePushDown interface.
|
||||
func (p *LogicalLimit) PredicatePushDown(predicates []expression.Expression, opt *optimizetrace.LogicalOptimizeOp) ([]expression.Expression, base.LogicalPlan) {
|
||||
// Limit forbids any condition to push down.
|
||||
|
||||
@ -685,66 +685,6 @@ func (p *LogicalProjection) ExtractColGroups(colGroups [][]*expression.Column) [
|
||||
return extracted
|
||||
}
|
||||
|
||||
func (*LogicalAggregation) getGroupNDVs(colGroups [][]*expression.Column, childProfile *property.StatsInfo, gbyCols []*expression.Column) []property.GroupNDV {
|
||||
if len(colGroups) == 0 {
|
||||
return nil
|
||||
}
|
||||
// Check if the child profile provides GroupNDV for the GROUP BY columns.
|
||||
// Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b,
|
||||
// but we have no other approaches for the NDV estimation of these cases
|
||||
// except for using the independent assumption, unless we can use stats of expression index.
|
||||
groupNDV := childProfile.GetGroupNDV4Cols(gbyCols)
|
||||
if groupNDV == nil {
|
||||
return nil
|
||||
}
|
||||
return []property.GroupNDV{*groupNDV}
|
||||
}
|
||||
|
||||
// DeriveStats implement LogicalPlan DeriveStats interface.
|
||||
func (la *LogicalAggregation) DeriveStats(childStats []*property.StatsInfo, selfSchema *expression.Schema, childSchema []*expression.Schema, colGroups [][]*expression.Column) (*property.StatsInfo, error) {
|
||||
childProfile := childStats[0]
|
||||
gbyCols := make([]*expression.Column, 0, len(la.GroupByItems))
|
||||
for _, gbyExpr := range la.GroupByItems {
|
||||
cols := expression.ExtractColumns(gbyExpr)
|
||||
gbyCols = append(gbyCols, cols...)
|
||||
}
|
||||
if la.StatsInfo() != nil {
|
||||
// Reload GroupNDVs since colGroups may have changed.
|
||||
la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols)
|
||||
return la.StatsInfo(), nil
|
||||
}
|
||||
ndv, _ := cardinality.EstimateColsNDVWithMatchedLen(gbyCols, childSchema[0], childProfile)
|
||||
la.SetStats(&property.StatsInfo{
|
||||
RowCount: ndv,
|
||||
ColNDVs: make(map[int64]float64, selfSchema.Len()),
|
||||
})
|
||||
// We cannot estimate the ColNDVs for every output, so we use a conservative strategy.
|
||||
for _, col := range selfSchema.Columns {
|
||||
la.StatsInfo().ColNDVs[col.UniqueID] = ndv
|
||||
}
|
||||
la.InputCount = childProfile.RowCount
|
||||
la.StatsInfo().GroupNDVs = la.getGroupNDVs(colGroups, childProfile, gbyCols)
|
||||
return la.StatsInfo(), nil
|
||||
}
|
||||
|
||||
// ExtractColGroups implements LogicalPlan ExtractColGroups interface.
|
||||
func (la *LogicalAggregation) ExtractColGroups(_ [][]*expression.Column) [][]*expression.Column {
|
||||
// Parent colGroups would be dicarded, because aggregation would make NDV of colGroups
|
||||
// which does not match GroupByItems invalid.
|
||||
// Note that gbyCols may not be the exact GROUP BY columns, e.g, GROUP BY a+b,
|
||||
// but we have no other approaches for the NDV estimation of these cases
|
||||
// except for using the independent assumption, unless we can use stats of expression index.
|
||||
gbyCols := make([]*expression.Column, 0, len(la.GroupByItems))
|
||||
for _, gbyExpr := range la.GroupByItems {
|
||||
cols := expression.ExtractColumns(gbyExpr)
|
||||
gbyCols = append(gbyCols, cols...)
|
||||
}
|
||||
if len(gbyCols) > 1 {
|
||||
return [][]*expression.Column{expression.SortColumns(gbyCols)}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *LogicalJoin) getGroupNDVs(colGroups [][]*expression.Column, childStats []*property.StatsInfo) []property.GroupNDV {
|
||||
outerIdx := int(-1)
|
||||
if p.JoinType == LeftOuterJoin || p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
|
||||
|
||||
@ -96,3 +96,17 @@ func EncodeIntAsUint32(result []byte, value int) []byte {
|
||||
binary.BigEndian.PutUint32(buf[:], uint32(value))
|
||||
return append(result, buf[:]...)
|
||||
}
|
||||
|
||||
// GetMaxSortPrefix returns the prefix offset of sortCols in allCols.
|
||||
func GetMaxSortPrefix(sortCols, allCols []*expression.Column) []int {
|
||||
tmpSchema := expression.NewSchema(allCols...)
|
||||
sortColOffsets := make([]int, 0, len(sortCols))
|
||||
for _, sortCol := range sortCols {
|
||||
offset := tmpSchema.ColumnIndex(sortCol)
|
||||
if offset == -1 {
|
||||
return sortColOffsets
|
||||
}
|
||||
sortColOffsets = append(sortColOffsets, offset)
|
||||
}
|
||||
return sortColOffsets
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ go_library(
|
||||
"//pkg/kv",
|
||||
"//pkg/planner/core/base",
|
||||
"//pkg/planner/property",
|
||||
"//pkg/planner/util",
|
||||
"//pkg/planner/util/optimizetrace",
|
||||
],
|
||||
)
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/property"
|
||||
"github.com/pingcap/tidb/pkg/planner/util"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
|
||||
)
|
||||
|
||||
@ -68,5 +69,22 @@ var FindBestTask func(p base.LogicalPlan, prop *property.PhysicalProperty, planC
|
||||
|
||||
// CanPushToCopImpl will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete logical
|
||||
// operators.
|
||||
// todo: (7) arenatlx, remove this util func pointer when logical operators are all moved from core to logicalop.
|
||||
// todo: (7) arenatlx, remove this util func pointer when logical operators are all moved from core to logicalOp.
|
||||
var CanPushToCopImpl func(p base.LogicalPlan, storeTp kv.StoreType, considerDual bool) bool
|
||||
|
||||
// GetStreamAggs will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete physical
|
||||
// operators.
|
||||
// todo: (8) arenatlx, move this util func pointer to physicalOp when physical operators are all moved.
|
||||
var GetStreamAggs func(lp base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan
|
||||
|
||||
// GetHashAggs will be called by baseLogicalPlan in logicalOp pkg. The logic inside covers concrete physical
|
||||
// operators.
|
||||
// todo: (9) arenatlx, move this util func pointer to physicalOp when physical operators are all moved.
|
||||
var GetHashAggs func(la base.LogicalPlan, prop *property.PhysicalProperty) []base.PhysicalPlan
|
||||
|
||||
// PruneByItems will be called by baseLogicalPlan in logicalOp pkg. The logic current exists for rule logic
|
||||
// inside core.
|
||||
// todo: (10) arenatlx, when rule is moved out of core, we should direct ref the rule.Func instead of this
|
||||
// util func pointer.
|
||||
var PruneByItems func(p base.LogicalPlan, old []*util.ByItems, opt *optimizetrace.LogicalOptimizeOp) (
|
||||
byItems []*util.ByItems, parentUsedCols []*expression.Column)
|
||||
|
||||
Reference in New Issue
Block a user