Files
tidb/executor/new_executor_xapi.go
2016-07-13 18:16:51 +08:00

421 lines
11 KiB
Go

// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"math"
"sort"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/plan"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
"github.com/pingcap/tidb/xapi"
"github.com/pingcap/tipb/go-tipb"
)
// NewXSelectIndexExec represents XAPI select index executor without result fields.
type NewXSelectIndexExec struct {
tableInfo *model.TableInfo
table table.Table
asName *model.CIStr
ctx context.Context
supportDesc bool
isMemDB bool
result *xapi.SelectResult
subResult *xapi.SubResult
where *tipb.Expr
tasks []*lookupTableTask
taskCursor int
indexOrder map[int64]int
indexPlan *plan.PhysicalIndexScan
mu sync.Mutex
}
// Fields implements Exec Fields interface.
func (e *NewXSelectIndexExec) Fields() []*ast.ResultField {
return nil
}
// Schema implements Exec Schema interface.
func (e *NewXSelectIndexExec) Schema() expression.Schema {
return e.indexPlan.GetSchema()
}
// Close implements Exec Close interface.
func (e *NewXSelectIndexExec) Close() error {
e.result = nil
e.subResult = nil
return nil
}
// Next implements Executor Next interface.
func (e *NewXSelectIndexExec) Next() (*Row, error) {
if e.tasks == nil {
startTs := time.Now()
handles, err := e.fetchHandles()
if err != nil {
return nil, errors.Trace(err)
}
log.Debugf("[TIME_INDEX_SCAN] time: %v handles: %d", time.Now().Sub(startTs), len(handles))
e.buildTableTasks(handles)
limitCount := int64(-1)
if e.indexPlan.LimitCount != nil {
limitCount = *e.indexPlan.LimitCount
}
concurrency := 2
if limitCount == -1 {
concurrency = len(e.tasks)
} else if limitCount > int64(BaseLookupTableTaskSize) {
concurrency = int(limitCount/int64(BaseLookupTableTaskSize) + 1)
}
log.Debugf("[TIME_INDEX_TABLE_CONCURRENT_SCAN] start %d workers", concurrency)
e.runTableTasks(concurrency)
}
for {
if e.taskCursor >= len(e.tasks) {
return nil, nil
}
task := e.tasks[e.taskCursor]
if !task.done {
startTs := time.Now()
err := <-task.doneCh
if err != nil {
return nil, errors.Trace(err)
}
task.done = true
log.Debugf("[TIME_INDEX_TABLE_SCAN] time: %v handles: %d", time.Now().Sub(startTs), len(task.handles))
}
if task.cursor < len(task.rows) {
row := task.rows[task.cursor]
task.cursor++
return row, nil
}
e.taskCursor++
}
}
// fetchHandle fetches all handles from the index.
// This should be optimized to fetch handles in batch.
func (e *NewXSelectIndexExec) fetchHandles() ([]int64, error) {
idxResult, err := e.doIndexRequest()
if err != nil {
return nil, errors.Trace(err)
}
handles, err := extractHandlesFromIndexResult(idxResult)
if err != nil {
return nil, errors.Trace(err)
}
// TODO: support out of order when implement cbo
if !e.indexPlan.OutOfOrder {
// Save the index order.
e.indexOrder = make(map[int64]int)
for i, h := range handles {
e.indexOrder[h] = i
}
}
return handles, nil
}
func (e *NewXSelectIndexExec) doIndexRequest() (*xapi.SelectResult, error) {
txn, err := e.ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
selIdxReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selIdxReq.StartTs = &startTs
selIdxReq.IndexInfo = xapi.IndexToProto(e.table.Meta(), e.indexPlan.Index)
// Push limit to index request only if there is not filter conditions.
selIdxReq.Limit = e.indexPlan.LimitCount
if e.indexPlan.Desc {
selIdxReq.OrderBy = append(selIdxReq.OrderBy, &tipb.ByItem{Desc: &e.indexPlan.Desc})
}
fieldTypes := make([]*types.FieldType, len(e.indexPlan.Columns))
for i, v := range e.indexPlan.Index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
selIdxReq.Ranges, err = indexRangesToPBRanges(e.indexPlan.Ranges, fieldTypes)
if err != nil {
return nil, errors.Trace(err)
}
concurrency := 1
if e.indexPlan.OutOfOrder {
concurrency = 10
}
return xapi.Select(txn.GetClient(), selIdxReq, concurrency)
}
func (e *NewXSelectIndexExec) buildTableTasks(handles []int64) {
// Build tasks with increasing batch size.
var taskSizes []int
total := len(handles)
batchSize := BaseLookupTableTaskSize
for total > 0 {
if batchSize > total {
batchSize = total
}
taskSizes = append(taskSizes, batchSize)
total -= batchSize
if batchSize < MaxLookupTableTaskSize {
batchSize *= 2
}
}
if e.indexPlan.Desc && !e.supportDesc {
// Reverse tasks sizes.
for i := 0; i < len(taskSizes)/2; i++ {
j := len(taskSizes) - i - 1
taskSizes[i], taskSizes[j] = taskSizes[j], taskSizes[i]
}
}
e.tasks = make([]*lookupTableTask, len(taskSizes))
for i, size := range taskSizes {
task := &lookupTableTask{
handles: handles[:size],
}
task.doneCh = make(chan error, 1)
handles = handles[size:]
e.tasks[i] = task
}
if e.indexPlan.Desc && !e.supportDesc {
// Reverse tasks order.
for i := 0; i < len(e.tasks)/2; i++ {
j := len(e.tasks) - i - 1
e.tasks[i], e.tasks[j] = e.tasks[j], e.tasks[i]
}
}
}
func (e *NewXSelectIndexExec) runTableTasks(n int) {
for i := 0; i < n && i < len(e.tasks); i++ {
go e.pickAndExecTask()
}
}
func (e *NewXSelectIndexExec) pickAndExecTask() {
for {
// Pick a new task.
e.mu.Lock()
var task *lookupTableTask
for _, t := range e.tasks {
if t.status == taskNew {
task = t
task.status = taskRunning
break
}
}
e.mu.Unlock()
if task == nil {
// No more task to run.
break
}
// Execute the picked task.
err := e.executeTask(task)
e.mu.Lock()
task.status = taskDone
e.mu.Unlock()
task.doneCh <- err
}
}
func (e *NewXSelectIndexExec) executeTask(task *lookupTableTask) error {
sort.Sort(int64Slice(task.handles))
tblResult, err := e.doTableRequest(task.handles)
if err != nil {
return errors.Trace(err)
}
task.rows, err = e.extractRowsFromTableResult(e.table, tblResult)
if err != nil {
return errors.Trace(err)
}
// TODO: check this
if !e.indexPlan.OutOfOrder {
// Restore the index order.
sorter := &rowsSorter{order: e.indexOrder, rows: task.rows}
if e.indexPlan.Desc && !e.supportDesc {
sort.Sort(sort.Reverse(sorter))
} else {
sort.Sort(sorter)
}
}
return nil
}
func (e *NewXSelectIndexExec) extractRowsFromTableResult(t table.Table, tblResult *xapi.SelectResult) ([]*Row, error) {
var rows []*Row
for {
subResult, err := tblResult.Next()
if err != nil {
return nil, errors.Trace(err)
}
if subResult == nil {
break
}
subRows, err := e.extractRowsFromSubResult(t, subResult)
if err != nil {
return nil, errors.Trace(err)
}
rows = append(rows, subRows...)
}
return rows, nil
}
func (e *NewXSelectIndexExec) extractRowsFromSubResult(t table.Table, subResult *xapi.SubResult) ([]*Row, error) {
var rows []*Row
for {
h, rowData, err := subResult.Next()
if err != nil {
return nil, errors.Trace(err)
}
if rowData == nil {
break
}
row := resultRowToRow(t, h, rowData, e.indexPlan.TableAsName)
rows = append(rows, row)
}
return rows, nil
}
func (e *NewXSelectIndexExec) doTableRequest(handles []int64) (*xapi.SelectResult, error) {
txn, err := e.ctx.GetTxn(false)
if err != nil {
return nil, errors.Trace(err)
}
// The handles are not in original index order, so we can't push limit here.
selTableReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selTableReq.StartTs = &startTs
selTableReq.TableInfo = &tipb.TableInfo{
TableId: proto.Int64(e.table.Meta().ID),
}
selTableReq.TableInfo.Columns = xapi.ColumnsToProto(e.indexPlan.Columns, e.table.Meta().PKIsHandle)
for _, h := range handles {
if h == math.MaxInt64 {
// We can't convert MaxInt64 into an left closed, right open range.
continue
}
pbRange := new(tipb.KeyRange)
pbRange.Low = codec.EncodeInt(nil, h)
pbRange.High = kv.Key(pbRange.Low).PrefixNext()
selTableReq.Ranges = append(selTableReq.Ranges, pbRange)
}
selTableReq.Where = e.where
// Aggregate Info
resp, err := xapi.Select(txn.GetClient(), selTableReq, defaultConcurrency)
if err != nil {
return nil, errors.Trace(err)
}
return resp, nil
}
// NewXSelectTableExec represents XAPI select executor without result fields.
type NewXSelectTableExec struct {
tableInfo *model.TableInfo
table table.Table
asName *model.CIStr
ctx context.Context
supportDesc bool
isMemDB bool
result *xapi.SelectResult
subResult *xapi.SubResult
where *tipb.Expr
Columns []*model.ColumnInfo
schema expression.Schema
ranges []plan.TableRange
}
// Schema implements Executor Schema interface.
func (e *NewXSelectTableExec) Schema() expression.Schema {
return e.schema
}
func (e *NewXSelectTableExec) doRequest() error {
txn, err := e.ctx.GetTxn(false)
if err != nil {
return errors.Trace(err)
}
selReq := new(tipb.SelectRequest)
startTs := txn.StartTS()
selReq.StartTs = &startTs
selReq.Where = e.where
selReq.Ranges = tableRangesToPBRanges(e.ranges)
columns := e.Columns
selReq.TableInfo = &tipb.TableInfo{
TableId: proto.Int64(e.tableInfo.ID),
}
selReq.TableInfo.Columns = xapi.ColumnsToProto(columns, e.tableInfo.PKIsHandle)
e.result, err = xapi.Select(txn.GetClient(), selReq, defaultConcurrency)
if err != nil {
return errors.Trace(err)
}
return nil
}
// Close implements Executor Close interface.
func (e *NewXSelectTableExec) Close() error {
e.result = nil
e.subResult = nil
return nil
}
// Next implements Executor interface.
func (e *NewXSelectTableExec) Next() (*Row, error) {
if e.result == nil {
err := e.doRequest()
if err != nil {
return nil, errors.Trace(err)
}
}
for {
if e.subResult == nil {
var err error
startTs := time.Now()
e.subResult, err = e.result.Next()
if err != nil {
return nil, errors.Trace(err)
}
if e.subResult == nil {
return nil, nil
}
log.Debugf("[TIME_TABLE_SCAN] %v", time.Now().Sub(startTs))
}
h, rowData, err := e.subResult.Next()
if err != nil {
return nil, errors.Trace(err)
}
if rowData == nil {
e.subResult = nil
continue
}
return resultRowToRow(e.table, h, rowData, e.asName), nil
}
}
// Fields implements Executor interface.
func (e *NewXSelectTableExec) Fields() []*ast.ResultField {
return nil
}