Files
tidb/executor/window.go

177 lines
4.9 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"time"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/util/chunk"
)
// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now.
type WindowExec struct {
baseExecutor
groupChecker *groupChecker
inputIter *chunk.Iterator4Chunk
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
executed bool
meetNewGroup bool
remainingRowsInGroup int64
remainingRowsInChunk int
}
// Close implements the Executor Close interface.
func (e *WindowExec) Close() error {
e.childResults = nil
return errors.Trace(e.baseExecutor.Close())
}
// Next implements the Executor Next interface.
func (e *WindowExec) Next(ctx context.Context, chk *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("windowExec.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}
if e.runtimeStats != nil {
start := time.Now()
defer func() { e.runtimeStats.Record(time.Now().Sub(start), chk.NumRows()) }()
}
chk.Reset()
if e.meetNewGroup && e.remainingRowsInGroup > 0 {
err := e.appendResult2Chunk(chk.Chunk)
if err != nil {
return err
}
}
for !e.executed && (chk.NumRows() == 0 || e.remainingRowsInChunk > 0) {
err := e.consumeOneGroup(ctx, chk.Chunk)
if err != nil {
e.executed = true
return errors.Trace(err)
}
}
return nil
}
func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) error {
var err error
if err = e.fetchChildIfNecessary(ctx, chk); err != nil {
return errors.Trace(err)
}
for ; e.inputRow != e.inputIter.End(); e.inputRow = e.inputIter.Next() {
e.meetNewGroup, err = e.groupChecker.meetNewGroup(e.inputRow)
if err != nil {
return errors.Trace(err)
}
if e.meetNewGroup {
err := e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
err = e.appendResult2Chunk(chk)
if err != nil {
return errors.Trace(err)
}
}
e.groupRows = append(e.groupRows, e.inputRow)
if e.meetNewGroup {
e.inputRow = e.inputIter.Next()
return nil
}
}
return nil
}
func (e *WindowExec) consumeGroupRows() error {
if len(e.groupRows) == 0 {
return nil
}
err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult)
if err != nil {
return errors.Trace(err)
}
e.remainingRowsInGroup += int64(len(e.groupRows))
e.groupRows = e.groupRows[:0]
return nil
}
func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk) (err error) {
if e.inputIter != nil && e.inputRow != e.inputIter.End() {
return nil
}
// Before fetching a new batch of input, we should consume the last group rows.
err = e.consumeGroupRows()
if err != nil {
return errors.Trace(err)
}
childResult := e.children[0].newFirstChunk()
err = e.children[0].Next(ctx, &chunk.RecordBatch{Chunk: childResult})
if err != nil {
return errors.Trace(err)
}
e.childResults = append(e.childResults, childResult)
// No more data.
if childResult.NumRows() == 0 {
e.executed = true
err = e.appendResult2Chunk(chk)
return errors.Trace(err)
}
e.inputIter = chunk.NewIterator4Chunk(childResult)
e.inputRow = e.inputIter.Begin()
return nil
}
// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error {
e.copyChk(chk)
for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk)
if err != nil {
return err
}
e.remainingRowsInGroup--
e.remainingRowsInChunk--
}
if e.remainingRowsInGroup == 0 {
e.windowFunc.ResetPartialResult(e.partialResult)
}
return nil
}
func (e *WindowExec) copyChk(chk *chunk.Chunk) {
if len(e.childResults) == 0 || chk.NumRows() > 0 {
return
}
childResult := e.childResults[0]
e.childResults = e.childResults[1:]
e.remainingRowsInChunk = childResult.NumRows()
columns := e.Schema().Columns[:len(e.Schema().Columns)-1]
for i, col := range columns {
chk.MakeRefTo(i, childResult, col.Index)
}
}