Pass query string in context, also disallows prepare DDL, so we can be sure that DDL query do not contains parameter marker. change binlog row encoding format. including columnID in binlog row, so it can be adapted to different schema version. prepend old row for update binlog when don't have PK.
2014 lines
45 KiB
Go
2014 lines
45 KiB
Go
// Copyright 2015 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"container/heap"
|
|
"sort"
|
|
"sync"
|
|
|
|
"github.com/juju/errors"
|
|
"github.com/pingcap/tidb/ast"
|
|
"github.com/pingcap/tidb/context"
|
|
"github.com/pingcap/tidb/evaluator"
|
|
"github.com/pingcap/tidb/expression"
|
|
"github.com/pingcap/tidb/infoschema"
|
|
"github.com/pingcap/tidb/inspectkv"
|
|
"github.com/pingcap/tidb/kv"
|
|
"github.com/pingcap/tidb/model"
|
|
"github.com/pingcap/tidb/plan"
|
|
"github.com/pingcap/tidb/sessionctx"
|
|
"github.com/pingcap/tidb/sessionctx/db"
|
|
"github.com/pingcap/tidb/sessionctx/forupdate"
|
|
"github.com/pingcap/tidb/table"
|
|
"github.com/pingcap/tidb/terror"
|
|
"github.com/pingcap/tidb/util/codec"
|
|
"github.com/pingcap/tidb/util/distinct"
|
|
"github.com/pingcap/tidb/util/types"
|
|
)
|
|
|
|
var (
|
|
_ Executor = &CheckTableExec{}
|
|
_ Executor = &LimitExec{}
|
|
_ Executor = &SelectLockExec{}
|
|
_ Executor = &ShowDDLExec{}
|
|
_ Executor = &TableDualExec{}
|
|
)
|
|
|
|
// Error instances.
|
|
var (
|
|
ErrUnknownPlan = terror.ClassExecutor.New(CodeUnknownPlan, "Unknown plan")
|
|
ErrPrepareMulti = terror.ClassExecutor.New(CodePrepareMulti, "Can not prepare multiple statements")
|
|
ErrStmtNotFound = terror.ClassExecutor.New(CodeStmtNotFound, "Prepared statement not found")
|
|
ErrSchemaChanged = terror.ClassExecutor.New(CodeSchemaChanged, "Schema has changed")
|
|
ErrWrongParamCount = terror.ClassExecutor.New(CodeWrongParamCount, "Wrong parameter count")
|
|
ErrRowKeyCount = terror.ClassExecutor.New(CodeRowKeyCount, "Wrong row key entry count")
|
|
ErrPrepareDDL = terror.ClassExecutor.New(CodePrepareDDL, "Can not prepare DDL statements")
|
|
)
|
|
|
|
// Error codes.
|
|
const (
|
|
CodeUnknownPlan terror.ErrCode = 1
|
|
CodePrepareMulti terror.ErrCode = 2
|
|
CodeStmtNotFound terror.ErrCode = 3
|
|
CodeSchemaChanged terror.ErrCode = 4
|
|
CodeWrongParamCount terror.ErrCode = 5
|
|
CodeRowKeyCount terror.ErrCode = 6
|
|
CodePrepareDDL terror.ErrCode = 7
|
|
)
|
|
|
|
// Row represents a record row.
|
|
type Row struct {
|
|
// Data is the output record data for current Plan.
|
|
Data []types.Datum
|
|
|
|
RowKeys []*RowKeyEntry
|
|
}
|
|
|
|
// RowKeyEntry is designed for Update/Delete statement in multi-table mode,
|
|
// we should know which table this row comes from.
|
|
type RowKeyEntry struct {
|
|
// The table which this row come from.
|
|
Tbl table.Table
|
|
// Row key.
|
|
Handle int64
|
|
// Table alias name.
|
|
TableAsName *model.CIStr
|
|
}
|
|
|
|
// Executor executes a query.
|
|
type Executor interface {
|
|
Fields() []*ast.ResultField
|
|
Next() (*Row, error)
|
|
Close() error
|
|
Schema() expression.Schema
|
|
}
|
|
|
|
// ShowDDLExec represents a show DDL executor.
|
|
type ShowDDLExec struct {
|
|
fields []*ast.ResultField
|
|
ctx context.Context
|
|
done bool
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *ShowDDLExec) Schema() expression.Schema {
|
|
return nil
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *ShowDDLExec) Fields() []*ast.ResultField {
|
|
return e.fields
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *ShowDDLExec) Next() (*Row, error) {
|
|
if e.done {
|
|
return nil, nil
|
|
}
|
|
|
|
txn, err := e.ctx.GetTxn(false)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
ddlInfo, err := inspectkv.GetDDLInfo(txn)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
bgInfo, err := inspectkv.GetBgDDLInfo(txn)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
var ddlOwner, ddlJob string
|
|
if ddlInfo.Owner != nil {
|
|
ddlOwner = ddlInfo.Owner.String()
|
|
}
|
|
if ddlInfo.Job != nil {
|
|
ddlJob = ddlInfo.Job.String()
|
|
}
|
|
|
|
var bgOwner, bgJob string
|
|
if bgInfo.Owner != nil {
|
|
bgOwner = bgInfo.Owner.String()
|
|
}
|
|
if bgInfo.Job != nil {
|
|
bgJob = bgInfo.Job.String()
|
|
}
|
|
|
|
row := &Row{}
|
|
row.Data = types.MakeDatums(
|
|
ddlInfo.SchemaVer,
|
|
ddlOwner,
|
|
ddlJob,
|
|
bgInfo.SchemaVer,
|
|
bgOwner,
|
|
bgJob,
|
|
)
|
|
for i, f := range e.fields {
|
|
f.Expr.SetValue(row.Data[i].GetValue())
|
|
}
|
|
e.done = true
|
|
|
|
return row, nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *ShowDDLExec) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// CheckTableExec represents a check table executor.
|
|
type CheckTableExec struct {
|
|
tables []*ast.TableName
|
|
ctx context.Context
|
|
done bool
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *CheckTableExec) Schema() expression.Schema {
|
|
return nil
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *CheckTableExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *CheckTableExec) Next() (*Row, error) {
|
|
if e.done {
|
|
return nil, nil
|
|
}
|
|
|
|
dbName := model.NewCIStr(db.GetCurrentSchema(e.ctx))
|
|
is := sessionctx.GetDomain(e.ctx).InfoSchema()
|
|
|
|
for _, t := range e.tables {
|
|
tb, err := is.TableByName(dbName, t.Name)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
for _, idx := range tb.Indices() {
|
|
txn, err := e.ctx.GetTxn(false)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
err = inspectkv.CompareIndexData(txn, tb, idx)
|
|
if err != nil {
|
|
return nil, errors.Errorf("%v err:%v", t.Name, err)
|
|
}
|
|
}
|
|
}
|
|
e.done = true
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
// Close implements plan.Plan Close interface.
|
|
func (e *CheckTableExec) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// FilterExec represents a filter executor.
|
|
type FilterExec struct {
|
|
Src Executor
|
|
Condition ast.ExprNode
|
|
ctx context.Context
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *FilterExec) Schema() expression.Schema {
|
|
return e.Src.Schema()
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *FilterExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *FilterExec) Next() (*Row, error) {
|
|
for {
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return nil, nil
|
|
}
|
|
match, err := evaluator.EvalBool(e.ctx, e.Condition)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if match {
|
|
return srcRow, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *FilterExec) Close() error {
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// SelectLockExec represents a select lock executor.
|
|
type SelectLockExec struct {
|
|
Src Executor
|
|
Lock ast.SelectLockType
|
|
ctx context.Context
|
|
schema expression.Schema
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *SelectLockExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *SelectLockExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *SelectLockExec) Next() (*Row, error) {
|
|
row, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
return nil, nil
|
|
}
|
|
if len(row.RowKeys) != 0 && e.Lock == ast.SelectLockForUpdate {
|
|
forupdate.SetForUpdate(e.ctx)
|
|
for _, k := range row.RowKeys {
|
|
err = k.Tbl.LockRow(e.ctx, k.Handle, true)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
return row, nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *SelectLockExec) Close() error {
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// LimitExec represents limit executor
|
|
type LimitExec struct {
|
|
Src Executor
|
|
Offset uint64
|
|
Count uint64
|
|
Idx uint64
|
|
schema expression.Schema
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *LimitExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *LimitExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *LimitExec) Next() (*Row, error) {
|
|
for e.Idx < e.Offset {
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return nil, nil
|
|
}
|
|
e.Idx++
|
|
}
|
|
if e.Idx >= e.Count+e.Offset {
|
|
return nil, nil
|
|
}
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return nil, nil
|
|
}
|
|
e.Idx++
|
|
return srcRow, nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *LimitExec) Close() error {
|
|
e.Idx = 0
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// orderByRow binds a row to its order values, so it can be sorted.
|
|
type orderByRow struct {
|
|
key []types.Datum
|
|
row *Row
|
|
}
|
|
|
|
// For select stmt with aggregate function but without groupby clasue,
|
|
// We consider there is a single group with key singleGroup.
|
|
var singleGroup = []byte("SingleGroup")
|
|
|
|
// DistinctExec represents Distinct executor.
|
|
type DistinctExec struct {
|
|
Src Executor
|
|
checker *distinct.Checker
|
|
schema expression.Schema
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *DistinctExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *DistinctExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *DistinctExec) Next() (*Row, error) {
|
|
if e.checker == nil {
|
|
e.checker = distinct.CreateDistinctChecker()
|
|
}
|
|
for {
|
|
row, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
return nil, nil
|
|
}
|
|
ok, err := e.checker.Check(types.DatumsToInterfaces(row.Data))
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if !ok {
|
|
continue
|
|
}
|
|
return row, nil
|
|
}
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *DistinctExec) Close() error {
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// ReverseExec produces reverse ordered result, it is used to wrap executors that do not support reverse scan.
|
|
type ReverseExec struct {
|
|
Src Executor
|
|
rows []*Row
|
|
cursor int
|
|
done bool
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *ReverseExec) Schema() expression.Schema {
|
|
return e.Src.Schema()
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *ReverseExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *ReverseExec) Next() (*Row, error) {
|
|
if !e.done {
|
|
for {
|
|
row, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
break
|
|
}
|
|
e.rows = append(e.rows, row)
|
|
}
|
|
e.cursor = len(e.rows) - 1
|
|
e.done = true
|
|
}
|
|
if e.cursor < 0 {
|
|
return nil, nil
|
|
}
|
|
row := e.rows[e.cursor]
|
|
e.cursor--
|
|
for i, field := range e.Src.Fields() {
|
|
field.Expr.SetDatum(row.Data[i])
|
|
}
|
|
return row, nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *ReverseExec) Close() error {
|
|
return e.Src.Close()
|
|
}
|
|
|
|
func init() {
|
|
plan.EvalSubquery = func(p plan.PhysicalPlan, is infoschema.InfoSchema, ctx context.Context) (d []types.Datum, err error) {
|
|
e := &executorBuilder{is: is, ctx: ctx}
|
|
exec := e.build(p)
|
|
row, err := exec.Next()
|
|
if err != nil {
|
|
return d, errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
return
|
|
}
|
|
return row.Data, nil
|
|
}
|
|
}
|
|
|
|
// HashJoinExec implements the hash join algorithm.
|
|
type HashJoinExec struct {
|
|
hashTable map[string][]*Row
|
|
smallHashKey []*expression.Column
|
|
bigHashKey []*expression.Column
|
|
smallExec Executor
|
|
bigExec Executor
|
|
prepared bool
|
|
ctx context.Context
|
|
smallFilter expression.Expression
|
|
bigFilter expression.Expression
|
|
otherFilter expression.Expression
|
|
schema expression.Schema
|
|
outer bool
|
|
leftSmall bool
|
|
cursor int
|
|
// targetTypes means the target the type that both smallHashKey and bigHashKey should convert to.
|
|
targetTypes []*types.FieldType
|
|
|
|
finished bool
|
|
// for sync multiple join workers.
|
|
wg sync.WaitGroup
|
|
|
|
// Concurrent channels.
|
|
concurrency int
|
|
bigTableRows []chan []*Row
|
|
bigTableErr chan error
|
|
hashJoinContexts []*hashJoinCtx
|
|
|
|
// Channels for output.
|
|
resultErr chan error
|
|
resultRows chan *Row
|
|
}
|
|
|
|
type hashJoinCtx struct {
|
|
bigFilter expression.Expression
|
|
otherFilter expression.Expression
|
|
// Buffer used for encode hash keys.
|
|
datumBuffer []types.Datum
|
|
hashKeyBuffer []byte
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *HashJoinExec) Close() error {
|
|
e.prepared = false
|
|
e.cursor = 0
|
|
return e.smallExec.Close()
|
|
}
|
|
|
|
func joinTwoRow(a *Row, b *Row) *Row {
|
|
ret := &Row{
|
|
RowKeys: make([]*RowKeyEntry, 0, len(a.RowKeys)+len(b.RowKeys)),
|
|
Data: make([]types.Datum, 0, len(a.Data)+len(b.Data)),
|
|
}
|
|
ret.RowKeys = append(ret.RowKeys, a.RowKeys...)
|
|
ret.RowKeys = append(ret.RowKeys, b.RowKeys...)
|
|
ret.Data = append(ret.Data, a.Data...)
|
|
ret.Data = append(ret.Data, b.Data...)
|
|
return ret
|
|
}
|
|
|
|
// getHashKey gets the hash key when given a row and hash columns.
|
|
// It will return a boolean value representing if the hash key has null, a byte slice representing the result hash code.
|
|
func getHashKey(cols []*expression.Column, row *Row, targetTypes []*types.FieldType, vals []types.Datum, bytes []byte) (bool, []byte, error) {
|
|
var err error
|
|
for i, col := range cols {
|
|
vals[i], err = col.Eval(row.Data, nil)
|
|
if err != nil {
|
|
return false, nil, errors.Trace(err)
|
|
}
|
|
if vals[i].IsNull() {
|
|
return true, nil, nil
|
|
}
|
|
if targetTypes[i].Tp != col.RetType.Tp {
|
|
vals[i], err = vals[i].ConvertTo(targetTypes[i])
|
|
if err != nil {
|
|
return false, nil, errors.Trace(err)
|
|
}
|
|
}
|
|
}
|
|
if len(vals) == 0 {
|
|
return false, nil, nil
|
|
}
|
|
bytes, err = codec.EncodeValue(bytes, vals...)
|
|
return false, bytes, errors.Trace(err)
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *HashJoinExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *HashJoinExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
var batchSize = 128
|
|
|
|
// Worker to get big table rows.
|
|
func (e *HashJoinExec) fetchBigExec() {
|
|
cnt := 0
|
|
defer func() {
|
|
for _, cn := range e.bigTableRows {
|
|
close(cn)
|
|
}
|
|
e.bigExec.Close()
|
|
}()
|
|
curBatchSize := 1
|
|
for {
|
|
if e.finished {
|
|
break
|
|
}
|
|
rows := make([]*Row, 0, batchSize)
|
|
done := false
|
|
for i := 0; i < curBatchSize; i++ {
|
|
row, err := e.bigExec.Next()
|
|
if err != nil {
|
|
e.bigTableErr <- errors.Trace(err)
|
|
done = true
|
|
break
|
|
}
|
|
if row == nil {
|
|
done = true
|
|
break
|
|
}
|
|
rows = append(rows, row)
|
|
}
|
|
idx := cnt % e.concurrency
|
|
e.bigTableRows[idx] <- rows
|
|
cnt++
|
|
if done {
|
|
break
|
|
}
|
|
if curBatchSize < batchSize {
|
|
curBatchSize *= 2
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *HashJoinExec) prepare() error {
|
|
e.finished = false
|
|
e.bigTableRows = make([]chan []*Row, e.concurrency)
|
|
for i := 0; i < e.concurrency; i++ {
|
|
e.bigTableRows[i] = make(chan []*Row, e.concurrency*batchSize)
|
|
}
|
|
e.bigTableErr = make(chan error, 1)
|
|
|
|
// Start a worker to fetch big table rows.
|
|
go e.fetchBigExec()
|
|
|
|
e.hashTable = make(map[string][]*Row)
|
|
e.cursor = 0
|
|
for {
|
|
row, err := e.smallExec.Next()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
e.smallExec.Close()
|
|
break
|
|
}
|
|
|
|
matched := true
|
|
if e.smallFilter != nil {
|
|
matched, err = expression.EvalBool(e.smallFilter, row.Data, e.ctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if !matched {
|
|
continue
|
|
}
|
|
}
|
|
hasNull, hashcode, err := getHashKey(e.smallHashKey, row, e.targetTypes, e.hashJoinContexts[0].datumBuffer, nil)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if hasNull {
|
|
continue
|
|
}
|
|
if rows, ok := e.hashTable[string(hashcode)]; !ok {
|
|
e.hashTable[string(hashcode)] = []*Row{row}
|
|
} else {
|
|
e.hashTable[string(hashcode)] = append(rows, row)
|
|
}
|
|
}
|
|
|
|
e.resultRows = make(chan *Row, e.concurrency*1000)
|
|
e.resultErr = make(chan error, 1)
|
|
|
|
e.wg = sync.WaitGroup{}
|
|
for i := 0; i < e.concurrency; i++ {
|
|
e.wg.Add(1)
|
|
go e.doJoin(i)
|
|
}
|
|
go e.closeChanWorker()
|
|
|
|
e.prepared = true
|
|
return nil
|
|
}
|
|
|
|
func (e *HashJoinExec) closeChanWorker() {
|
|
e.wg.Wait()
|
|
close(e.resultRows)
|
|
e.hashTable = nil
|
|
}
|
|
|
|
// doJoin does join job in one goroutine.
|
|
func (e *HashJoinExec) doJoin(idx int) {
|
|
for {
|
|
var (
|
|
bigRows []*Row
|
|
ok bool
|
|
err error
|
|
)
|
|
select {
|
|
case bigRows, ok = <-e.bigTableRows[idx]:
|
|
case err = <-e.bigTableErr:
|
|
}
|
|
if err != nil {
|
|
e.resultErr <- errors.Trace(err)
|
|
break
|
|
}
|
|
if !ok || e.finished {
|
|
break
|
|
}
|
|
for _, bigRow := range bigRows {
|
|
succ := e.join(e.hashJoinContexts[idx], bigRow)
|
|
if !succ {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
e.wg.Done()
|
|
}
|
|
|
|
func (e *HashJoinExec) join(ctx *hashJoinCtx, bigRow *Row) bool {
|
|
var (
|
|
matchedRows []*Row
|
|
err error
|
|
)
|
|
bigMatched := true
|
|
if e.bigFilter != nil {
|
|
bigMatched, err = expression.EvalBool(ctx.bigFilter, bigRow.Data, e.ctx)
|
|
if err != nil {
|
|
e.resultErr <- errors.Trace(err)
|
|
return false
|
|
}
|
|
}
|
|
if bigMatched {
|
|
matchedRows, err = e.constructMatchedRows(ctx, bigRow)
|
|
if err != nil {
|
|
e.resultErr <- errors.Trace(err)
|
|
return false
|
|
}
|
|
}
|
|
for _, r := range matchedRows {
|
|
e.resultRows <- r
|
|
}
|
|
if len(matchedRows) == 0 && e.outer {
|
|
r := e.fillNullRow(bigRow)
|
|
e.resultRows <- r
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (e *HashJoinExec) constructMatchedRows(ctx *hashJoinCtx, bigRow *Row) (matchedRows []*Row, err error) {
|
|
hasNull, hashcode, err := getHashKey(e.bigHashKey, bigRow, e.targetTypes, ctx.datumBuffer, ctx.hashKeyBuffer[0:0:cap(ctx.hashKeyBuffer)])
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
if hasNull {
|
|
return
|
|
}
|
|
rows, ok := e.hashTable[string(hashcode)]
|
|
if !ok {
|
|
return
|
|
}
|
|
// match eq condition
|
|
for _, smallRow := range rows {
|
|
otherMatched := true
|
|
var matchedRow *Row
|
|
if e.leftSmall {
|
|
matchedRow = joinTwoRow(smallRow, bigRow)
|
|
} else {
|
|
matchedRow = joinTwoRow(bigRow, smallRow)
|
|
}
|
|
if e.otherFilter != nil {
|
|
otherMatched, err = expression.EvalBool(ctx.otherFilter, matchedRow.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
if otherMatched {
|
|
matchedRows = append(matchedRows, matchedRow)
|
|
}
|
|
}
|
|
|
|
return matchedRows, nil
|
|
}
|
|
|
|
func (e *HashJoinExec) fillNullRow(bigRow *Row) (returnRow *Row) {
|
|
smallRow := &Row{
|
|
Data: make([]types.Datum, len(e.smallExec.Schema())),
|
|
}
|
|
|
|
for _, data := range smallRow.Data {
|
|
data.SetNull()
|
|
}
|
|
if e.leftSmall {
|
|
returnRow = joinTwoRow(smallRow, bigRow)
|
|
} else {
|
|
returnRow = joinTwoRow(bigRow, smallRow)
|
|
}
|
|
return returnRow
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *HashJoinExec) Next() (*Row, error) {
|
|
if !e.prepared {
|
|
if err := e.prepare(); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
var (
|
|
row *Row
|
|
err error
|
|
ok bool
|
|
)
|
|
select {
|
|
case row, ok = <-e.resultRows:
|
|
case err, ok = <-e.resultErr:
|
|
}
|
|
if err != nil {
|
|
e.finished = true
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if !ok {
|
|
return nil, nil
|
|
}
|
|
return row, nil
|
|
}
|
|
|
|
// HashSemiJoinExec implements the hash join algorithm for semi join.
|
|
type HashSemiJoinExec struct {
|
|
hashTable map[string][]*Row
|
|
smallHashKey []*expression.Column
|
|
bigHashKey []*expression.Column
|
|
smallExec Executor
|
|
bigExec Executor
|
|
prepared bool
|
|
ctx context.Context
|
|
smallFilter expression.Expression
|
|
bigFilter expression.Expression
|
|
otherFilter expression.Expression
|
|
schema expression.Schema
|
|
withAux bool
|
|
targetTypes []*types.FieldType
|
|
smallTableHasNull bool
|
|
// If anti is true, semi join only output the unmatched row.
|
|
anti bool
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *HashSemiJoinExec) Close() error {
|
|
e.prepared = false
|
|
e.hashTable = make(map[string][]*Row)
|
|
e.smallTableHasNull = false
|
|
err := e.smallExec.Close()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
return e.bigExec.Close()
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *HashSemiJoinExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *HashSemiJoinExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
func (e *HashSemiJoinExec) prepare() error {
|
|
e.hashTable = make(map[string][]*Row)
|
|
for {
|
|
row, err := e.smallExec.Next()
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
e.smallExec.Close()
|
|
break
|
|
}
|
|
|
|
matched := true
|
|
if e.smallFilter != nil {
|
|
matched, err = expression.EvalBool(e.smallFilter, row.Data, e.ctx)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if !matched {
|
|
continue
|
|
}
|
|
}
|
|
hasNull, hashcode, err := getHashKey(e.smallHashKey, row, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if hasNull {
|
|
e.smallTableHasNull = true
|
|
continue
|
|
}
|
|
if rows, ok := e.hashTable[string(hashcode)]; !ok {
|
|
e.hashTable[string(hashcode)] = []*Row{row}
|
|
} else {
|
|
e.hashTable[string(hashcode)] = append(rows, row)
|
|
}
|
|
}
|
|
|
|
e.prepared = true
|
|
return nil
|
|
}
|
|
|
|
func (e *HashSemiJoinExec) rowIsMatched(bigRow *Row) (matched bool, hasNull bool, err error) {
|
|
hasNull, hashcode, err := getHashKey(e.bigHashKey, bigRow, e.targetTypes, make([]types.Datum, len(e.smallHashKey)), nil)
|
|
if err != nil {
|
|
return false, false, errors.Trace(err)
|
|
}
|
|
if hasNull {
|
|
return false, true, nil
|
|
}
|
|
rows, ok := e.hashTable[string(hashcode)]
|
|
if !ok {
|
|
return
|
|
}
|
|
// match eq condition
|
|
for _, smallRow := range rows {
|
|
matched = true
|
|
if e.otherFilter != nil {
|
|
var matchedRow *Row
|
|
matchedRow = joinTwoRow(bigRow, smallRow)
|
|
matched, err = expression.EvalBool(e.otherFilter, matchedRow.Data, e.ctx)
|
|
if err != nil {
|
|
return false, false, errors.Trace(err)
|
|
}
|
|
}
|
|
if matched {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *HashSemiJoinExec) Next() (*Row, error) {
|
|
if !e.prepared {
|
|
if err := e.prepare(); err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
|
|
for {
|
|
bigRow, err := e.bigExec.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if bigRow == nil {
|
|
e.bigExec.Close()
|
|
return nil, nil
|
|
}
|
|
|
|
matched := true
|
|
if e.bigFilter != nil {
|
|
matched, err = expression.EvalBool(e.bigFilter, bigRow.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
isNull := false
|
|
if matched {
|
|
matched, isNull, err = e.rowIsMatched(bigRow)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
if !matched && e.smallTableHasNull {
|
|
isNull = true
|
|
}
|
|
if e.anti && !isNull {
|
|
matched = !matched
|
|
}
|
|
if e.withAux {
|
|
if isNull {
|
|
bigRow.Data = append(bigRow.Data, types.NewDatum(nil))
|
|
} else {
|
|
bigRow.Data = append(bigRow.Data, types.NewDatum(matched))
|
|
}
|
|
return bigRow, nil
|
|
} else if matched {
|
|
return bigRow, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// HashAggExec deals with all the aggregate functions.
|
|
// It is built from Aggregate Plan. When Next() is called, it reads all the data from Src and updates all the items in AggFuncs.
|
|
type HashAggExec struct {
|
|
Src Executor
|
|
schema expression.Schema
|
|
ResultFields []*ast.ResultField
|
|
executed bool
|
|
hasGby bool
|
|
aggType plan.AggregationType
|
|
ctx context.Context
|
|
AggFuncs []expression.AggregationFunction
|
|
groupMap map[string]bool
|
|
groups [][]byte
|
|
currentGroupIndex int
|
|
GroupByItems []expression.Expression
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *HashAggExec) Close() error {
|
|
e.executed = false
|
|
e.groups = nil
|
|
e.currentGroupIndex = 0
|
|
for _, agg := range e.AggFuncs {
|
|
agg.Clear()
|
|
}
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *HashAggExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *HashAggExec) Fields() []*ast.ResultField {
|
|
return e.ResultFields
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *HashAggExec) Next() (*Row, error) {
|
|
// In this stage we consider all data from src as a single group.
|
|
if !e.executed {
|
|
e.groupMap = make(map[string]bool)
|
|
for {
|
|
hasMore, err := e.innerNext()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if !hasMore {
|
|
break
|
|
}
|
|
}
|
|
e.executed = true
|
|
if (len(e.groups) == 0) && !e.hasGby {
|
|
// If no groupby and no data, we should add an empty group.
|
|
// For example:
|
|
// "select count(c) from t;" should return one row [0]
|
|
// "select count(c) from t group by c1;" should return empty result set.
|
|
e.groups = append(e.groups, []byte{})
|
|
}
|
|
}
|
|
if e.currentGroupIndex >= len(e.groups) {
|
|
return nil, nil
|
|
}
|
|
retRow := &Row{Data: make([]types.Datum, 0, len(e.AggFuncs))}
|
|
groupKey := e.groups[e.currentGroupIndex]
|
|
for _, af := range e.AggFuncs {
|
|
retRow.Data = append(retRow.Data, af.GetGroupResult(groupKey))
|
|
}
|
|
e.currentGroupIndex++
|
|
return retRow, nil
|
|
}
|
|
|
|
func (e *HashAggExec) getGroupKey(row *Row) ([]byte, error) {
|
|
if e.aggType == plan.FinalAgg {
|
|
val, err := e.GroupByItems[0].Eval(row.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return val.GetBytes(), nil
|
|
}
|
|
if !e.hasGby {
|
|
return []byte{}, nil
|
|
}
|
|
vals := make([]types.Datum, 0, len(e.GroupByItems))
|
|
for _, item := range e.GroupByItems {
|
|
v, err := item.Eval(row.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
vals = append(vals, v)
|
|
}
|
|
bs, err := codec.EncodeValue([]byte{}, vals...)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return bs, nil
|
|
}
|
|
|
|
// Fetch a single row from src and update each aggregate function.
|
|
// If the first return value is false, it means there is no more data from src.
|
|
func (e *HashAggExec) innerNext() (ret bool, err error) {
|
|
var srcRow *Row
|
|
if e.Src != nil {
|
|
srcRow, err = e.Src.Next()
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return false, nil
|
|
}
|
|
} else {
|
|
// If Src is nil, only one row should be returned.
|
|
if e.executed {
|
|
return false, nil
|
|
}
|
|
}
|
|
e.executed = true
|
|
groupKey, err := e.getGroupKey(srcRow)
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
if _, ok := e.groupMap[string(groupKey)]; !ok {
|
|
e.groupMap[string(groupKey)] = true
|
|
e.groups = append(e.groups, groupKey)
|
|
}
|
|
for _, af := range e.AggFuncs {
|
|
af.Update(srcRow.Data, groupKey, e.ctx)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
// StreamAggExec deals with all the aggregate functions.
|
|
// It assumes all the input datas is sorted by group by key.
|
|
// When Next() is called, it will return a result for the same group.
|
|
type StreamAggExec struct {
|
|
Src Executor
|
|
schema expression.Schema
|
|
ResultFields []*ast.ResultField
|
|
executed bool
|
|
hasData bool
|
|
ctx context.Context
|
|
AggFuncs []expression.AggregationFunction
|
|
GroupByItems []expression.Expression
|
|
curGroupEncodedKey []byte
|
|
curGroupKey []types.Datum
|
|
tmpGroupKey []types.Datum
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *StreamAggExec) Close() error {
|
|
e.executed = false
|
|
e.hasData = false
|
|
for _, agg := range e.AggFuncs {
|
|
agg.Clear()
|
|
}
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *StreamAggExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *StreamAggExec) Fields() []*ast.ResultField {
|
|
return e.ResultFields
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *StreamAggExec) Next() (*Row, error) {
|
|
if e.executed {
|
|
return nil, nil
|
|
}
|
|
retRow := &Row{Data: make([]types.Datum, 0, len(e.AggFuncs))}
|
|
for {
|
|
row, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
var newGroup bool
|
|
if row == nil {
|
|
newGroup = true
|
|
e.executed = true
|
|
} else {
|
|
e.hasData = true
|
|
newGroup, err = e.meetNewGroup(row)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
if newGroup {
|
|
for _, af := range e.AggFuncs {
|
|
retRow.Data = append(retRow.Data, af.GetStreamResult())
|
|
}
|
|
}
|
|
if e.executed {
|
|
break
|
|
}
|
|
for _, af := range e.AggFuncs {
|
|
err = af.StreamUpdate(row.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
if newGroup {
|
|
break
|
|
}
|
|
}
|
|
if !e.hasData && len(e.GroupByItems) > 0 {
|
|
return nil, nil
|
|
}
|
|
return retRow, nil
|
|
}
|
|
|
|
// meetNewGroup returns a value that represents if the new group is different from last group.
|
|
func (e *StreamAggExec) meetNewGroup(row *Row) (bool, error) {
|
|
if len(e.GroupByItems) == 0 {
|
|
return false, nil
|
|
}
|
|
e.tmpGroupKey = e.tmpGroupKey[:0]
|
|
matched, firstGroup := true, false
|
|
if len(e.curGroupKey) == 0 {
|
|
matched, firstGroup = false, true
|
|
}
|
|
for i, item := range e.GroupByItems {
|
|
v, err := item.Eval(row.Data, e.ctx)
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
if matched {
|
|
c, err := v.CompareDatum(e.curGroupKey[i])
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
matched = c == 0
|
|
}
|
|
e.tmpGroupKey = append(e.tmpGroupKey, v)
|
|
}
|
|
if matched {
|
|
return false, nil
|
|
}
|
|
e.curGroupKey = e.tmpGroupKey
|
|
var err error
|
|
e.curGroupEncodedKey, err = codec.EncodeValue(e.curGroupEncodedKey[0:0:cap(e.curGroupEncodedKey)], e.curGroupKey...)
|
|
if err != nil {
|
|
return false, errors.Trace(err)
|
|
}
|
|
return !firstGroup, nil
|
|
}
|
|
|
|
// ProjectionExec represents a select fields executor.
|
|
type ProjectionExec struct {
|
|
Src Executor
|
|
ResultFields []*ast.ResultField
|
|
schema expression.Schema
|
|
executed bool
|
|
ctx context.Context
|
|
exprs []expression.Expression
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *ProjectionExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *ProjectionExec) Fields() []*ast.ResultField {
|
|
return e.ResultFields
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *ProjectionExec) Next() (retRow *Row, err error) {
|
|
var rowKeys []*RowKeyEntry
|
|
var srcRow *Row
|
|
if e.Src != nil {
|
|
srcRow, err = e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return nil, nil
|
|
}
|
|
rowKeys = srcRow.RowKeys
|
|
} else {
|
|
// If Src is nil, only one row should be returned.
|
|
if e.executed {
|
|
return nil, nil
|
|
}
|
|
}
|
|
e.executed = true
|
|
row := &Row{
|
|
RowKeys: rowKeys,
|
|
Data: make([]types.Datum, 0, len(e.exprs)),
|
|
}
|
|
for _, expr := range e.exprs {
|
|
val, err := expr.Eval(srcRow.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
row.Data = append(row.Data, val)
|
|
}
|
|
return row, nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *ProjectionExec) Close() error {
|
|
if e.Src != nil {
|
|
return e.Src.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TableDualExec represents a dual table executor.
|
|
type TableDualExec struct {
|
|
schema expression.Schema
|
|
executed bool
|
|
}
|
|
|
|
// Init implements Executor Init interface.
|
|
func (e *TableDualExec) Init() {
|
|
e.executed = false
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *TableDualExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *TableDualExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *TableDualExec) Next() (*Row, error) {
|
|
if e.executed {
|
|
return nil, nil
|
|
}
|
|
e.executed = true
|
|
return &Row{}, nil
|
|
}
|
|
|
|
// Close implements Executor interface.
|
|
func (e *TableDualExec) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// SelectionExec represents a filter executor.
|
|
type SelectionExec struct {
|
|
Src Executor
|
|
Condition expression.Expression
|
|
ctx context.Context
|
|
schema expression.Schema
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *SelectionExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *SelectionExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *SelectionExec) Next() (*Row, error) {
|
|
for {
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return nil, nil
|
|
}
|
|
match, err := expression.EvalBool(e.Condition, srcRow.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if match {
|
|
return srcRow, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *SelectionExec) Close() error {
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// TableScanExec is a table scan executor without result fields.
|
|
type TableScanExec struct {
|
|
t table.Table
|
|
asName *model.CIStr
|
|
ctx context.Context
|
|
ranges []plan.TableRange
|
|
seekHandle int64
|
|
iter kv.Iterator
|
|
cursor int
|
|
schema expression.Schema
|
|
columns []*model.ColumnInfo
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *TableScanExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor interface.
|
|
func (e *TableScanExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Next implements Executor interface.
|
|
func (e *TableScanExec) Next() (*Row, error) {
|
|
for {
|
|
if e.cursor >= len(e.ranges) {
|
|
return nil, nil
|
|
}
|
|
ran := e.ranges[e.cursor]
|
|
if e.seekHandle < ran.LowVal {
|
|
e.seekHandle = ran.LowVal
|
|
}
|
|
if e.seekHandle > ran.HighVal {
|
|
e.cursor++
|
|
continue
|
|
}
|
|
handle, found, err := e.t.Seek(e.ctx, e.seekHandle)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if !found {
|
|
return nil, nil
|
|
}
|
|
if handle > ran.HighVal {
|
|
// The handle is out of the current range, but may be in following ranges.
|
|
// We seek to the range that may contains the handle, so we
|
|
// don't need to seek key again.
|
|
inRange := e.seekRange(handle)
|
|
if !inRange {
|
|
// The handle may be less than the current range low value, can not
|
|
// return directly.
|
|
continue
|
|
}
|
|
}
|
|
row, err := e.getRow(handle)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
e.seekHandle = handle + 1
|
|
return row, nil
|
|
}
|
|
}
|
|
|
|
// seekRange increments the range cursor to the range
|
|
// with high value greater or equal to handle.
|
|
func (e *TableScanExec) seekRange(handle int64) (inRange bool) {
|
|
for {
|
|
e.cursor++
|
|
if e.cursor >= len(e.ranges) {
|
|
return false
|
|
}
|
|
ran := e.ranges[e.cursor]
|
|
if handle < ran.LowVal {
|
|
return false
|
|
}
|
|
if handle > ran.HighVal {
|
|
continue
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
|
|
func (e *TableScanExec) getRow(handle int64) (*Row, error) {
|
|
row := &Row{}
|
|
var err error
|
|
|
|
columns := make([]*table.Column, len(e.schema))
|
|
for i, v := range e.columns {
|
|
columns[i] = &table.Column{ColumnInfo: *v}
|
|
}
|
|
row.Data, err = e.t.RowWithCols(e.ctx, handle, columns)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
// Put rowKey to the tail of record row.
|
|
rke := &RowKeyEntry{
|
|
Tbl: e.t,
|
|
Handle: handle,
|
|
TableAsName: e.asName,
|
|
}
|
|
row.RowKeys = append(row.RowKeys, rke)
|
|
return row, nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *TableScanExec) Close() error {
|
|
e.iter = nil
|
|
e.cursor = 0
|
|
return nil
|
|
}
|
|
|
|
// SortExec represents sorting executor.
|
|
type SortExec struct {
|
|
Src Executor
|
|
ByItems []*plan.ByItems
|
|
Rows []*orderByRow
|
|
ctx context.Context
|
|
Idx int
|
|
fetched bool
|
|
err error
|
|
schema expression.Schema
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *SortExec) Close() error {
|
|
e.fetched = false
|
|
e.Rows = nil
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *SortExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *SortExec) Fields() []*ast.ResultField {
|
|
return e.Src.Fields()
|
|
}
|
|
|
|
// Len returns the number of rows.
|
|
func (e *SortExec) Len() int {
|
|
return len(e.Rows)
|
|
}
|
|
|
|
// Swap implements sort.Interface Swap interface.
|
|
func (e *SortExec) Swap(i, j int) {
|
|
e.Rows[i], e.Rows[j] = e.Rows[j], e.Rows[i]
|
|
}
|
|
|
|
// Less implements sort.Interface Less interface.
|
|
func (e *SortExec) Less(i, j int) bool {
|
|
for index, by := range e.ByItems {
|
|
v1 := e.Rows[i].key[index]
|
|
v2 := e.Rows[j].key[index]
|
|
|
|
ret, err := v1.CompareDatum(v2)
|
|
if err != nil {
|
|
e.err = errors.Trace(err)
|
|
return true
|
|
}
|
|
|
|
if by.Desc {
|
|
ret = -ret
|
|
}
|
|
|
|
if ret < 0 {
|
|
return true
|
|
} else if ret > 0 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *SortExec) Next() (*Row, error) {
|
|
if !e.fetched {
|
|
for {
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
break
|
|
}
|
|
orderRow := &orderByRow{
|
|
row: srcRow,
|
|
key: make([]types.Datum, len(e.ByItems)),
|
|
}
|
|
for i, byItem := range e.ByItems {
|
|
orderRow.key[i], err = byItem.Expr.Eval(srcRow.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
e.Rows = append(e.Rows, orderRow)
|
|
}
|
|
sort.Sort(e)
|
|
e.fetched = true
|
|
}
|
|
if e.err != nil {
|
|
return nil, errors.Trace(e.err)
|
|
}
|
|
if e.Idx >= len(e.Rows) {
|
|
return nil, nil
|
|
}
|
|
row := e.Rows[e.Idx].row
|
|
e.Idx++
|
|
return row, nil
|
|
}
|
|
|
|
// TopnExec implements a top n algo.
|
|
type TopnExec struct {
|
|
SortExec
|
|
limit *plan.Limit
|
|
totalCount int
|
|
heapSize int
|
|
}
|
|
|
|
// Less implements heap.Interface Less interface.
|
|
func (e *TopnExec) Less(i, j int) bool {
|
|
for index, by := range e.ByItems {
|
|
v1 := e.Rows[i].key[index]
|
|
v2 := e.Rows[j].key[index]
|
|
|
|
ret, err := v1.CompareDatum(v2)
|
|
if err != nil {
|
|
e.err = errors.Trace(err)
|
|
return true
|
|
}
|
|
|
|
if by.Desc {
|
|
ret = -ret
|
|
}
|
|
|
|
if ret > 0 {
|
|
return true
|
|
} else if ret < 0 {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Len implements heap.Interface Len interface.
|
|
func (e *TopnExec) Len() int {
|
|
return e.heapSize
|
|
}
|
|
|
|
// Push implements heap.Interface Push interface.
|
|
func (e *TopnExec) Push(x interface{}) {
|
|
e.Rows = append(e.Rows, x.(*orderByRow))
|
|
e.heapSize++
|
|
}
|
|
|
|
// Pop implements heap.Interface Pop interface.
|
|
func (e *TopnExec) Pop() interface{} {
|
|
e.heapSize--
|
|
return nil
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *TopnExec) Next() (*Row, error) {
|
|
if !e.fetched {
|
|
e.Idx = int(e.limit.Offset)
|
|
e.totalCount = int(e.limit.Offset + e.limit.Count)
|
|
e.Rows = make([]*orderByRow, 0, e.totalCount+1)
|
|
e.heapSize = 0
|
|
for {
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
break
|
|
}
|
|
orderRow := &orderByRow{
|
|
row: srcRow,
|
|
key: make([]types.Datum, len(e.ByItems)),
|
|
}
|
|
for i, byItem := range e.ByItems {
|
|
orderRow.key[i], err = byItem.Expr.Eval(srcRow.Data, e.ctx)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
}
|
|
if e.totalCount == e.heapSize {
|
|
e.Rows = append(e.Rows, orderRow)
|
|
if e.Less(0, e.heapSize) {
|
|
e.Swap(0, e.heapSize)
|
|
heap.Fix(e, 0)
|
|
}
|
|
e.Rows = e.Rows[:e.heapSize]
|
|
} else {
|
|
heap.Push(e, orderRow)
|
|
}
|
|
}
|
|
if e.limit.Offset == 0 {
|
|
sort.Sort(&e.SortExec)
|
|
} else {
|
|
for i := 0; i < int(e.limit.Count) && e.Len() > 0; i++ {
|
|
heap.Pop(e)
|
|
}
|
|
}
|
|
e.fetched = true
|
|
}
|
|
if e.Idx >= len(e.Rows) {
|
|
return nil, nil
|
|
}
|
|
row := e.Rows[e.Idx].row
|
|
e.Idx++
|
|
return row, nil
|
|
}
|
|
|
|
// ApplyExec represents apply executor.
|
|
// Apply gets one row from outer executor and gets one row from inner executor according to outer row.
|
|
type ApplyExec struct {
|
|
schema expression.Schema
|
|
Src Executor
|
|
outerSchema expression.Schema
|
|
innerExec Executor
|
|
checker *conditionChecker
|
|
}
|
|
|
|
// conditionChecker checks if all or any of the row match this condition.
|
|
type conditionChecker struct {
|
|
cond expression.Expression
|
|
trimLen int
|
|
ctx context.Context
|
|
all bool
|
|
dataHasNull bool
|
|
}
|
|
|
|
// Check returns finished for checking if the input row can determine the final result,
|
|
// and returns data for the eval result.
|
|
func (c *conditionChecker) Check(rowData []types.Datum) (finished bool, data types.Datum, err error) {
|
|
data, err = c.cond.Eval(rowData, c.ctx)
|
|
if err != nil {
|
|
return false, data, errors.Trace(err)
|
|
}
|
|
var matched int64
|
|
if data.IsNull() {
|
|
c.dataHasNull = true
|
|
matched = 0
|
|
} else {
|
|
matched, err = data.ToBool()
|
|
if err != nil {
|
|
return false, data, errors.Trace(err)
|
|
}
|
|
}
|
|
return (matched != 0) != c.all, data, nil
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *ApplyExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *ApplyExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *ApplyExec) Close() error {
|
|
if e.checker != nil {
|
|
e.checker.dataHasNull = false
|
|
}
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *ApplyExec) Next() (*Row, error) {
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return nil, nil
|
|
}
|
|
for {
|
|
for _, col := range e.outerSchema {
|
|
idx := col.Index
|
|
col.SetValue(&srcRow.Data[idx])
|
|
}
|
|
innerRow, err := e.innerExec.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
trimLen := len(srcRow.Data)
|
|
if innerRow != nil {
|
|
srcRow.Data = append(srcRow.Data, innerRow.Data...)
|
|
}
|
|
if e.checker == nil {
|
|
e.innerExec.Close()
|
|
return srcRow, nil
|
|
}
|
|
if innerRow == nil {
|
|
var d types.Datum
|
|
// If we can't determine the result until the last row comes, the all must be true and any must not be true.
|
|
// If the any have met a null, the result will be null.
|
|
if e.checker.dataHasNull && !e.checker.all {
|
|
d = types.NewDatum(nil)
|
|
} else {
|
|
d = types.NewDatum(e.checker.all)
|
|
}
|
|
srcRow.Data = append(srcRow.Data, d)
|
|
e.checker.dataHasNull = false
|
|
e.innerExec.Close()
|
|
return srcRow, nil
|
|
}
|
|
finished, data, err := e.checker.Check(srcRow.Data)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
srcRow.Data = srcRow.Data[:trimLen]
|
|
if finished {
|
|
e.checker.dataHasNull = false
|
|
e.innerExec.Close()
|
|
srcRow.Data = append(srcRow.Data, data)
|
|
return srcRow, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// ExistsExec represents exists executor.
|
|
type ExistsExec struct {
|
|
schema expression.Schema
|
|
Src Executor
|
|
evaluated bool
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *ExistsExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *ExistsExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *ExistsExec) Close() error {
|
|
e.evaluated = false
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *ExistsExec) Next() (*Row, error) {
|
|
if !e.evaluated {
|
|
e.evaluated = true
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
return &Row{Data: []types.Datum{types.NewDatum(srcRow != nil)}}, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// MaxOneRowExec checks if a query returns no more than one row.
|
|
type MaxOneRowExec struct {
|
|
schema expression.Schema
|
|
Src Executor
|
|
evaluated bool
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *MaxOneRowExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *MaxOneRowExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *MaxOneRowExec) Close() error {
|
|
e.evaluated = false
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *MaxOneRowExec) Next() (*Row, error) {
|
|
if !e.evaluated {
|
|
e.evaluated = true
|
|
srcRow, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow == nil {
|
|
return &Row{Data: make([]types.Datum, len(e.schema))}, nil
|
|
}
|
|
srcRow1, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if srcRow1 != nil {
|
|
return nil, errors.New("Subquery returns more than 1 row.")
|
|
}
|
|
return srcRow, nil
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
// TrimExec truncates src rows.
|
|
type TrimExec struct {
|
|
schema expression.Schema
|
|
Src Executor
|
|
len int
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *TrimExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *TrimExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *TrimExec) Close() error {
|
|
return e.Src.Close()
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *TrimExec) Next() (*Row, error) {
|
|
row, err := e.Src.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
return nil, nil
|
|
}
|
|
row.Data = row.Data[:e.len]
|
|
return row, nil
|
|
}
|
|
|
|
// UnionExec represents union executor.
|
|
type UnionExec struct {
|
|
fields []*ast.ResultField
|
|
schema expression.Schema
|
|
Srcs []Executor
|
|
cursor int
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *UnionExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *UnionExec) Fields() []*ast.ResultField {
|
|
return e.fields
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *UnionExec) Next() (*Row, error) {
|
|
for {
|
|
if e.cursor >= len(e.Srcs) {
|
|
return nil, nil
|
|
}
|
|
sel := e.Srcs[e.cursor]
|
|
row, err := sel.Next()
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if row == nil {
|
|
e.cursor++
|
|
continue
|
|
}
|
|
if e.cursor != 0 {
|
|
for i := range row.Data {
|
|
// The column value should be casted as the same type of the first select statement in corresponding position.
|
|
col := e.schema[i]
|
|
var val types.Datum
|
|
val, err = row.Data[i].ConvertTo(col.RetType)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
row.Data[i] = val
|
|
}
|
|
}
|
|
return row, nil
|
|
}
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *UnionExec) Close() error {
|
|
e.cursor = 0
|
|
for _, sel := range e.Srcs {
|
|
er := sel.Close()
|
|
if er != nil {
|
|
return errors.Trace(er)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DummyScanExec represents a dummy table.
|
|
type DummyScanExec struct {
|
|
schema expression.Schema
|
|
}
|
|
|
|
// Schema implements Executor Schema interface.
|
|
func (e *DummyScanExec) Schema() expression.Schema {
|
|
return e.schema
|
|
}
|
|
|
|
// Close implements Executor Close interface.
|
|
func (e *DummyScanExec) Close() error {
|
|
return nil
|
|
}
|
|
|
|
// Fields implements Executor Fields interface.
|
|
func (e *DummyScanExec) Fields() []*ast.ResultField {
|
|
return nil
|
|
}
|
|
|
|
// Next implements Executor Next interface.
|
|
func (e *DummyScanExec) Next() (*Row, error) {
|
|
return nil, nil
|
|
}
|