planner: move window frame definition to logical window for logical convergence (#54567)
ref pingcap/tidb#51664, ref pingcap/tidb#52714
This commit is contained in:
@ -16,14 +16,10 @@ package core
|
||||
|
||||
import (
|
||||
"github.com/pingcap/tidb/pkg/expression"
|
||||
"github.com/pingcap/tidb/pkg/parser/ast"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
fd "github.com/pingcap/tidb/pkg/planner/funcdep"
|
||||
"github.com/pingcap/tidb/pkg/planner/util"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/intset"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -150,105 +146,3 @@ func ExtractEquivalenceCols(conditions []expression.Expression, sctx base.PlanCo
|
||||
}
|
||||
return equivUniqueIDs
|
||||
}
|
||||
|
||||
// WindowFrame represents a window function frame.
|
||||
type WindowFrame struct {
|
||||
Type ast.FrameType
|
||||
Start *FrameBound
|
||||
End *FrameBound
|
||||
}
|
||||
|
||||
// Clone copies a window frame totally.
|
||||
func (wf *WindowFrame) Clone() *WindowFrame {
|
||||
cloned := new(WindowFrame)
|
||||
*cloned = *wf
|
||||
|
||||
cloned.Start = wf.Start.Clone()
|
||||
cloned.End = wf.End.Clone()
|
||||
|
||||
return cloned
|
||||
}
|
||||
|
||||
// FrameBound is the boundary of a frame.
|
||||
type FrameBound struct {
|
||||
Type ast.BoundType
|
||||
UnBounded bool
|
||||
Num uint64
|
||||
// CalcFuncs is used for range framed windows.
|
||||
// We will build the date_add or date_sub functions for frames like `INTERVAL '2:30' MINUTE_SECOND FOLLOWING`,
|
||||
// and plus or minus for frames like `1 preceding`.
|
||||
CalcFuncs []expression.Expression
|
||||
// Sometimes we need to cast order by column to a specific type when frame type is range
|
||||
CompareCols []expression.Expression
|
||||
// CmpFuncs is used to decide whether one row is included in the current frame.
|
||||
CmpFuncs []expression.CompareFunc
|
||||
// This field is used for passing information to tiflash
|
||||
CmpDataType tipb.RangeCmpDataType
|
||||
// IsExplicitRange marks if this range explicitly appears in the sql
|
||||
IsExplicitRange bool
|
||||
}
|
||||
|
||||
// Clone copies a frame bound totally.
|
||||
func (fb *FrameBound) Clone() *FrameBound {
|
||||
cloned := new(FrameBound)
|
||||
*cloned = *fb
|
||||
|
||||
cloned.CalcFuncs = make([]expression.Expression, 0, len(fb.CalcFuncs))
|
||||
for _, it := range fb.CalcFuncs {
|
||||
cloned.CalcFuncs = append(cloned.CalcFuncs, it.Clone())
|
||||
}
|
||||
cloned.CmpFuncs = fb.CmpFuncs
|
||||
|
||||
return cloned
|
||||
}
|
||||
|
||||
func (fb *FrameBound) updateCmpFuncsAndCmpDataType(cmpDataType types.EvalType) {
|
||||
// When cmpDataType can't match to any condition, we can ignore it.
|
||||
//
|
||||
// For example:
|
||||
// `create table test.range_test(p int not null,o text not null,v int not null);`
|
||||
// `select *, first_value(v) over (partition by p order by o) as a from range_test;`
|
||||
// The sql's frame type is range, but the cmpDataType is ETString and when the user explicitly use range frame
|
||||
// the sql will raise error before generating logical plan, so it's ok to ignore it.
|
||||
switch cmpDataType {
|
||||
case types.ETInt:
|
||||
fb.CmpFuncs[0] = expression.CompareInt
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Int
|
||||
case types.ETDatetime, types.ETTimestamp:
|
||||
fb.CmpFuncs[0] = expression.CompareTime
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_DateTime
|
||||
case types.ETDuration:
|
||||
fb.CmpFuncs[0] = expression.CompareDuration
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Duration
|
||||
case types.ETReal:
|
||||
fb.CmpFuncs[0] = expression.CompareReal
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Float
|
||||
case types.ETDecimal:
|
||||
fb.CmpFuncs[0] = expression.CompareDecimal
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Decimal
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateCompareCols will update CompareCols.
|
||||
func (fb *FrameBound) UpdateCompareCols(ctx sessionctx.Context, orderByCols []*expression.Column) error {
|
||||
ectx := ctx.GetExprCtx().GetEvalCtx()
|
||||
|
||||
if len(fb.CalcFuncs) > 0 {
|
||||
fb.CompareCols = make([]expression.Expression, len(orderByCols))
|
||||
if fb.CalcFuncs[0].GetType(ectx).EvalType() != orderByCols[0].GetType(ectx).EvalType() {
|
||||
var err error
|
||||
fb.CompareCols[0], err = expression.NewFunctionBase(ctx.GetExprCtx(), ast.Cast, fb.CalcFuncs[0].GetType(ectx), orderByCols[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for i, col := range orderByCols {
|
||||
fb.CompareCols[i] = col
|
||||
}
|
||||
}
|
||||
|
||||
cmpDataType := expression.GetAccurateCmpType(ctx.GetExprCtx().GetEvalCtx(), fb.CompareCols[0], fb.CalcFuncs[0])
|
||||
fb.updateCmpFuncsAndCmpDataType(cmpDataType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -17,12 +17,15 @@ package core
|
||||
import (
|
||||
"github.com/pingcap/tidb/pkg/expression"
|
||||
"github.com/pingcap/tidb/pkg/expression/aggregation"
|
||||
"github.com/pingcap/tidb/pkg/parser/ast"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/base"
|
||||
"github.com/pingcap/tidb/pkg/planner/core/operator/logicalop"
|
||||
"github.com/pingcap/tidb/pkg/planner/property"
|
||||
"github.com/pingcap/tidb/pkg/planner/util/optimizetrace"
|
||||
"github.com/pingcap/tidb/pkg/sessionctx"
|
||||
"github.com/pingcap/tidb/pkg/types"
|
||||
"github.com/pingcap/tidb/pkg/util/plancodec"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
)
|
||||
|
||||
// LogicalWindow represents a logical window function plan.
|
||||
@ -35,6 +38,108 @@ type LogicalWindow struct {
|
||||
Frame *WindowFrame
|
||||
}
|
||||
|
||||
// WindowFrame represents a window function frame.
|
||||
type WindowFrame struct {
|
||||
Type ast.FrameType
|
||||
Start *FrameBound
|
||||
End *FrameBound
|
||||
}
|
||||
|
||||
// Clone copies a window frame totally.
|
||||
func (wf *WindowFrame) Clone() *WindowFrame {
|
||||
cloned := new(WindowFrame)
|
||||
*cloned = *wf
|
||||
|
||||
cloned.Start = wf.Start.Clone()
|
||||
cloned.End = wf.End.Clone()
|
||||
|
||||
return cloned
|
||||
}
|
||||
|
||||
// FrameBound is the boundary of a frame.
|
||||
type FrameBound struct {
|
||||
Type ast.BoundType
|
||||
UnBounded bool
|
||||
Num uint64
|
||||
// CalcFuncs is used for range framed windows.
|
||||
// We will build the date_add or date_sub functions for frames like `INTERVAL '2:30' MINUTE_SECOND FOLLOWING`,
|
||||
// and plus or minus for frames like `1 preceding`.
|
||||
CalcFuncs []expression.Expression
|
||||
// Sometimes we need to cast order by column to a specific type when frame type is range
|
||||
CompareCols []expression.Expression
|
||||
// CmpFuncs is used to decide whether one row is included in the current frame.
|
||||
CmpFuncs []expression.CompareFunc
|
||||
// This field is used for passing information to tiflash
|
||||
CmpDataType tipb.RangeCmpDataType
|
||||
// IsExplicitRange marks if this range explicitly appears in the sql
|
||||
IsExplicitRange bool
|
||||
}
|
||||
|
||||
// Clone copies a frame bound totally.
|
||||
func (fb *FrameBound) Clone() *FrameBound {
|
||||
cloned := new(FrameBound)
|
||||
*cloned = *fb
|
||||
|
||||
cloned.CalcFuncs = make([]expression.Expression, 0, len(fb.CalcFuncs))
|
||||
for _, it := range fb.CalcFuncs {
|
||||
cloned.CalcFuncs = append(cloned.CalcFuncs, it.Clone())
|
||||
}
|
||||
cloned.CmpFuncs = fb.CmpFuncs
|
||||
|
||||
return cloned
|
||||
}
|
||||
|
||||
func (fb *FrameBound) updateCmpFuncsAndCmpDataType(cmpDataType types.EvalType) {
|
||||
// When cmpDataType can't match to any condition, we can ignore it.
|
||||
//
|
||||
// For example:
|
||||
// `create table test.range_test(p int not null,o text not null,v int not null);`
|
||||
// `select *, first_value(v) over (partition by p order by o) as a from range_test;`
|
||||
// The sql's frame type is range, but the cmpDataType is ETString and when the user explicitly use range frame
|
||||
// the sql will raise error before generating logical plan, so it's ok to ignore it.
|
||||
switch cmpDataType {
|
||||
case types.ETInt:
|
||||
fb.CmpFuncs[0] = expression.CompareInt
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Int
|
||||
case types.ETDatetime, types.ETTimestamp:
|
||||
fb.CmpFuncs[0] = expression.CompareTime
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_DateTime
|
||||
case types.ETDuration:
|
||||
fb.CmpFuncs[0] = expression.CompareDuration
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Duration
|
||||
case types.ETReal:
|
||||
fb.CmpFuncs[0] = expression.CompareReal
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Float
|
||||
case types.ETDecimal:
|
||||
fb.CmpFuncs[0] = expression.CompareDecimal
|
||||
fb.CmpDataType = tipb.RangeCmpDataType_Decimal
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateCompareCols will update CompareCols.
|
||||
func (fb *FrameBound) UpdateCompareCols(ctx sessionctx.Context, orderByCols []*expression.Column) error {
|
||||
ectx := ctx.GetExprCtx().GetEvalCtx()
|
||||
|
||||
if len(fb.CalcFuncs) > 0 {
|
||||
fb.CompareCols = make([]expression.Expression, len(orderByCols))
|
||||
if fb.CalcFuncs[0].GetType(ectx).EvalType() != orderByCols[0].GetType(ectx).EvalType() {
|
||||
var err error
|
||||
fb.CompareCols[0], err = expression.NewFunctionBase(ctx.GetExprCtx(), ast.Cast, fb.CalcFuncs[0].GetType(ectx), orderByCols[0])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
for i, col := range orderByCols {
|
||||
fb.CompareCols[i] = col
|
||||
}
|
||||
}
|
||||
|
||||
cmpDataType := expression.GetAccurateCmpType(ctx.GetExprCtx().GetEvalCtx(), fb.CompareCols[0], fb.CalcFuncs[0])
|
||||
fb.updateCmpFuncsAndCmpDataType(cmpDataType)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init initializes LogicalWindow.
|
||||
func (p LogicalWindow) Init(ctx base.PlanContext, offset int) *LogicalWindow {
|
||||
p.BaseLogicalPlan = logicalop.NewBaseLogicalPlan(ctx, plancodec.TypeWindow, &p, offset)
|
||||
|
||||
Reference in New Issue
Block a user