planner/core: stabilize the execution order of window functions (#11095)
This commit is contained in:
@ -18,6 +18,7 @@ import (
|
||||
"math"
|
||||
"math/bits"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strings"
|
||||
"unicode"
|
||||
|
||||
@ -25,6 +26,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/parser"
|
||||
"github.com/pingcap/parser/ast"
|
||||
"github.com/pingcap/parser/format"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/parser/opcode"
|
||||
@ -3103,11 +3105,70 @@ func (b *PlanBuilder) buildWindowFunctionFrame(spec *ast.WindowSpec, orderByItem
|
||||
return frame, err
|
||||
}
|
||||
|
||||
func getAllByItems(itemsBuf []*ast.ByItem, spec *ast.WindowSpec) []*ast.ByItem {
|
||||
itemsBuf = itemsBuf[:0]
|
||||
if spec.PartitionBy != nil {
|
||||
itemsBuf = append(itemsBuf, spec.PartitionBy.Items...)
|
||||
}
|
||||
if spec.OrderBy != nil {
|
||||
itemsBuf = append(itemsBuf, spec.OrderBy.Items...)
|
||||
}
|
||||
return itemsBuf
|
||||
}
|
||||
|
||||
func restoreByItemText(item *ast.ByItem) string {
|
||||
var sb strings.Builder
|
||||
ctx := format.NewRestoreCtx(0, &sb)
|
||||
err := item.Expr.Restore(ctx)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func compareItems(lItems []*ast.ByItem, rItems []*ast.ByItem) bool {
|
||||
minLen := mathutil.Min(len(lItems), len(rItems))
|
||||
for i := 0; i < minLen; i++ {
|
||||
res := strings.Compare(restoreByItemText(lItems[i]), restoreByItemText(rItems[i]))
|
||||
if res != 0 {
|
||||
return res < 0
|
||||
}
|
||||
res = compareBool(lItems[i].Desc, rItems[i].Desc)
|
||||
if res != 0 {
|
||||
return res < 0
|
||||
}
|
||||
}
|
||||
return len(lItems) < len(rItems)
|
||||
}
|
||||
|
||||
type windowFuncs struct {
|
||||
spec *ast.WindowSpec
|
||||
funcs []*ast.WindowFuncExpr
|
||||
}
|
||||
|
||||
// sortWindowSpecs sorts the window specifications by reversed alphabetical order, then we could add less `Sort` operator
|
||||
// in physical plan because the window functions with the same partition by and order by clause will be at near places.
|
||||
func sortWindowSpecs(groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr) []windowFuncs {
|
||||
windows := make([]windowFuncs, 0, len(groupedFuncs))
|
||||
for spec, funcs := range groupedFuncs {
|
||||
windows = append(windows, windowFuncs{spec, funcs})
|
||||
}
|
||||
lItemsBuf := make([]*ast.ByItem, 0, 4)
|
||||
rItemsBuf := make([]*ast.ByItem, 0, 4)
|
||||
sort.SliceStable(windows, func(i, j int) bool {
|
||||
lItemsBuf = getAllByItems(lItemsBuf, windows[i].spec)
|
||||
rItemsBuf = getAllByItems(rItemsBuf, windows[j].spec)
|
||||
return !compareItems(lItemsBuf, rItemsBuf)
|
||||
})
|
||||
return windows
|
||||
}
|
||||
|
||||
func (b *PlanBuilder) buildWindowFunctions(p LogicalPlan, groupedFuncs map[*ast.WindowSpec][]*ast.WindowFuncExpr, aggMap map[*ast.AggregateFuncExpr]int) (LogicalPlan, map[*ast.WindowFuncExpr]int, error) {
|
||||
args := make([]ast.ExprNode, 0, 4)
|
||||
windowMap := make(map[*ast.WindowFuncExpr]int)
|
||||
for spec, funcs := range groupedFuncs {
|
||||
for _, window := range sortWindowSpecs(groupedFuncs) {
|
||||
args = args[:0]
|
||||
spec, funcs := window.spec, window.funcs
|
||||
for _, windowFunc := range funcs {
|
||||
args = append(args, windowFunc.Args...)
|
||||
}
|
||||
|
||||
@ -2317,6 +2317,11 @@ func (s *testPlanSuite) TestWindowFunction(c *C) {
|
||||
sql: "delete from t order by (sum(a) over())",
|
||||
result: "[planner:3593]You cannot use the window function 'sum' in this context.'",
|
||||
},
|
||||
{
|
||||
// The best execution order should be (a,c), (a, b, c), (a, b), (), it requires only 2 sort operations.
|
||||
sql: "select sum(a) over (partition by a order by b), sum(b) over (order by a, b, c), sum(c) over(partition by a order by c), sum(d) over() from t",
|
||||
result: "TableReader(Table(t))->Sort->Window(sum(cast(test.t.c)) over(partition by test.t.a order by test.t.c asc range between unbounded preceding and current row))->Sort->Window(sum(cast(test.t.b)) over(order by test.t.a asc, test.t.b asc, test.t.c asc range between unbounded preceding and current row))->Window(sum(cast(test.t.a)) over(partition by test.t.a order by test.t.b asc range between unbounded preceding and current row))->Window(sum(cast(test.t.d)) over())->Projection",
|
||||
},
|
||||
}
|
||||
|
||||
s.Parser.EnableWindowFunc(true)
|
||||
|
||||
Reference in New Issue
Block a user