Merge pull request #1084 from pingcap/hanfei
executor: support limit sort
This commit is contained in:
@ -333,6 +333,7 @@ func (b *executorBuilder) buildSort(v *plan.Sort) Executor {
|
||||
Src: src,
|
||||
ByItems: v.ByItems,
|
||||
ctx: b.ctx,
|
||||
Limit: v.ExecLimit,
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
@ -856,6 +856,7 @@ type SortExec struct {
|
||||
ByItems []*ast.ByItem
|
||||
Rows []*orderByRow
|
||||
ctx context.Context
|
||||
Limit *plan.Limit
|
||||
Idx int
|
||||
fetched bool
|
||||
err error
|
||||
@ -902,9 +903,18 @@ func (e *SortExec) Less(i, j int) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// SortBufferSize represents the total extra row count that sort can use.
|
||||
var SortBufferSize = 500
|
||||
|
||||
// Next implements Executor Next interface.
|
||||
func (e *SortExec) Next() (*Row, error) {
|
||||
if !e.fetched {
|
||||
offset := -1
|
||||
totalCount := -1
|
||||
if e.Limit != nil {
|
||||
offset = int(e.Limit.Offset)
|
||||
totalCount = offset + int(e.Limit.Count)
|
||||
}
|
||||
for {
|
||||
srcRow, err := e.Src.Next()
|
||||
if err != nil {
|
||||
@ -924,8 +934,15 @@ func (e *SortExec) Next() (*Row, error) {
|
||||
}
|
||||
}
|
||||
e.Rows = append(e.Rows, orderRow)
|
||||
if totalCount != -1 && e.Len() >= totalCount+SortBufferSize {
|
||||
sort.Sort(e)
|
||||
e.Rows = e.Rows[:totalCount]
|
||||
}
|
||||
}
|
||||
sort.Sort(e)
|
||||
if offset > 0 {
|
||||
e.Rows = e.Rows[offset:totalCount]
|
||||
}
|
||||
e.fetched = true
|
||||
}
|
||||
if e.err != nil {
|
||||
|
||||
@ -594,6 +594,25 @@ func (s *testSuite) TestSelectOrderBy(c *C) {
|
||||
r.Check(testkit.Rows(rowStr))
|
||||
tk.MustExec("commit")
|
||||
|
||||
// Test limit + order by
|
||||
tk.MustExec("begin")
|
||||
executor.SortBufferSize = 10
|
||||
for i := 3; i <= 10; i += 1 {
|
||||
tk.MustExec(fmt.Sprintf("insert INTO select_order_test VALUES (%d, \"zz\");", i))
|
||||
}
|
||||
tk.MustExec("insert INTO select_order_test VALUES (10086, \"hi\");")
|
||||
for i := 11; i <= 20; i += 1 {
|
||||
tk.MustExec(fmt.Sprintf("insert INTO select_order_test VALUES (%d, \"hh\");", i))
|
||||
}
|
||||
for i := 21; i <= 30; i += 1 {
|
||||
tk.MustExec(fmt.Sprintf("insert INTO select_order_test VALUES (%d, \"zz\");", i))
|
||||
}
|
||||
tk.MustExec("insert INTO select_order_test VALUES (1501, \"aa\");")
|
||||
r = tk.MustQuery("select * from select_order_test order by name, id limit 1 offset 3;")
|
||||
rowStr = fmt.Sprintf("%v %v", 11, []byte("hh"))
|
||||
r.Check(testkit.Rows(rowStr))
|
||||
tk.MustExec("commit")
|
||||
executor.SortBufferSize = 500
|
||||
tk.MustExec("drop table select_order_test")
|
||||
}
|
||||
|
||||
|
||||
@ -486,6 +486,10 @@ func (b *planBuilder) buildLimit(src Plan, limit *ast.Limit) Plan {
|
||||
Offset: limit.Offset,
|
||||
Count: limit.Count,
|
||||
}
|
||||
if s, ok := src.(*Sort); ok {
|
||||
s.ExecLimit = li
|
||||
return s
|
||||
}
|
||||
li.SetSrc(src)
|
||||
li.SetFields(src.Fields())
|
||||
return li
|
||||
|
||||
@ -273,6 +273,8 @@ type Sort struct {
|
||||
planWithSrc
|
||||
|
||||
ByItems []*ast.ByItem
|
||||
|
||||
ExecLimit *Limit
|
||||
}
|
||||
|
||||
// Accept implements Plan Accept interface.
|
||||
|
||||
Reference in New Issue
Block a user