Files
tidb/executor/mpp_gather.go

333 lines
10 KiB
Go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/plancodec"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
)
// Currently we only use mpp for broadcast join.
func useMPPExecution(ctx sessionctx.Context, tr *plannercore.PhysicalTableReader) bool {
if !ctx.GetSessionVars().AllowMPPExecution {
return false
}
if tr.StoreType != kv.TiFlash {
return false
}
return true
}
type mppTask struct {
taskInfo kv.MPPTask // on which store this task will execute
id int64 // mppTaskID
startTs uint64
tableID int64 // physical table id
}
// ToPB generates the pb structure.
func (t *mppTask) ToPB() *mpp.TaskMeta {
meta := &mpp.TaskMeta{
StartTs: t.startTs,
TaskId: t.id,
}
if t.id != -1 {
meta.Address = t.taskInfo.GetAddress()
}
return meta
}
// planFragment is cut from the whole pushed-down plan by pipeline breaker.
// Communication by pfs are always through shuffling / broadcasting / passing through.
type planFragment struct {
p plannercore.PhysicalPlan
/// following field are filled during getPlanFragment.
// TODO: Strictly speaking, not all plan fragment contain table scan. we can do this assumption until more plans are supported.
tableScan *plannercore.PhysicalTableScan // result physical table scan
exchangeReceivers []*ExchangeReceiver // data receivers
// following fields are filled after scheduling.
exchangeSender *ExchangeSender // data exporter
}
// ExchangeReceiver accepts connection and receives data passively.
type ExchangeReceiver struct {
plannercore.PhysicalExchangerBase
tasks []*mppTask
childPf *planFragment
schema *expression.Schema
}
// ToPB generates the pb structure.
func (e *ExchangeReceiver) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) {
encodedTask := make([][]byte, 0, len(e.tasks))
for _, task := range e.tasks {
encodedStr, err := task.ToPB().Marshal()
if err != nil {
return nil, errors.Trace(err)
}
encodedTask = append(encodedTask, encodedStr)
}
fieldTypes := make([]*tipb.FieldType, 0, len(e.schema.Columns))
for _, column := range e.schema.Columns {
fieldTypes = append(fieldTypes, expression.ToPBFieldType(column.RetType))
}
ecExec := &tipb.ExchangeReceiver{
EncodedTaskMeta: encodedTask,
FieldTypes: fieldTypes,
}
executorID := e.ExplainID().String()
return &tipb.Executor{
Tp: tipb.ExecType_TypeExchangeReceiver,
ExchangeReceiver: ecExec,
ExecutorId: &executorID,
}, nil
}
// ExchangeSender dispatches data to upstream tasks. That means push mode processing,
type ExchangeSender struct {
plannercore.PhysicalExchangerBase
tasks []*mppTask
exchangeType tipb.ExchangeType
}
// ToPB generates the pb structure.
func (e *ExchangeSender) ToPB(ctx sessionctx.Context, storeType kv.StoreType) (*tipb.Executor, error) {
child, err := e.Children()[0].ToPB(ctx, kv.TiFlash)
if err != nil {
return nil, errors.Trace(err)
}
encodedTask := make([][]byte, 0, len(e.tasks))
for _, task := range e.tasks {
encodedStr, err := task.ToPB().Marshal()
if err != nil {
return nil, errors.Trace(err)
}
encodedTask = append(encodedTask, encodedStr)
}
ecExec := &tipb.ExchangeSender{
Tp: e.exchangeType,
EncodedTaskMeta: encodedTask,
Child: child,
}
executorID := e.ExplainID().String()
return &tipb.Executor{
Tp: tipb.ExecType_TypeExchangeSender,
ExchangeSender: ecExec,
ExecutorId: &executorID,
}, nil
}
// MPPGather dispatch MPP tasks and read data from root tasks.
type MPPGather struct {
// following fields are construct needed
baseExecutor
is infoschema.InfoSchema
originalPlan plannercore.PhysicalPlan
startTS uint64
allocTaskID int64
mppReqs []*kv.MPPDispatchRequest
respIter distsql.SelectResult
}
func (e *MPPGather) constructMPPTasksImpl(ctx context.Context, p *planFragment) ([]*mppTask, error) {
if p.tableScan.Table.GetPartitionInfo() == nil {
return e.constructSinglePhysicalTable(ctx, p.tableScan.Table.ID, p.tableScan.Ranges)
}
tmp, _ := e.is.TableByID(p.tableScan.Table.ID)
tbl := tmp.(table.PartitionedTable)
partitions, err := partitionPruning(e.ctx, tbl, p.tableScan.PartitionInfo.PruningConds, p.tableScan.PartitionInfo.PartitionNames, p.tableScan.PartitionInfo.Columns, p.tableScan.PartitionInfo.ColumnNames)
if err != nil {
return nil, errors.Trace(err)
}
allTasks := make([]*mppTask, 0)
for _, part := range partitions {
partTasks, err := e.constructSinglePhysicalTable(ctx, part.GetPhysicalID(), p.tableScan.Ranges)
if err != nil {
return nil, errors.Trace(err)
}
allTasks = append(allTasks, partTasks...)
}
return allTasks, nil
}
func (e *MPPGather) constructSinglePhysicalTable(ctx context.Context, tableID int64, ranges []*ranger.Range) ([]*mppTask, error) {
kvRanges := distsql.TableRangesToKVRanges(tableID, ranges, nil)
req := &kv.MPPBuildTasksRequest{KeyRanges: kvRanges}
stores, err := e.ctx.GetMPPClient().ConstructMPPTasks(ctx, req)
if err != nil {
return nil, errors.Trace(err)
}
tasks := make([]*mppTask, 0, len(stores))
for _, store := range stores {
e.allocTaskID++
tasks = append(tasks, &mppTask{taskInfo: store, id: e.allocTaskID, startTs: e.startTS, tableID: tableID})
}
return tasks, nil
}
func getPlanFragments(ctx sessionctx.Context, p plannercore.PhysicalPlan, pf *planFragment) {
switch x := p.(type) {
case *plannercore.PhysicalTableScan:
x.IsGlobalRead = false
pf.tableScan = x
case *plannercore.PhysicalBroadCastJoin:
// This is a pipeline breaker. So we replace broadcast side with a exchangerClient
bcChild := x.Children()[x.InnerChildIdx]
exchangeSender := &ExchangeSender{exchangeType: tipb.ExchangeType_Broadcast}
exchangeSender.InitBasePlan(ctx, plancodec.TypeExchangeSender)
npf := &planFragment{p: bcChild, exchangeSender: exchangeSender}
exchangeSender.SetChildren(npf.p)
exchangeReceivers := &ExchangeReceiver{
childPf: npf,
schema: bcChild.Schema(),
}
exchangeReceivers.InitBasePlan(ctx, plancodec.TypeExchangeReceiver)
x.Children()[x.InnerChildIdx] = exchangeReceivers
pf.exchangeReceivers = append(pf.exchangeReceivers, exchangeReceivers)
// For the inner side of join, we use a new plan fragment.
getPlanFragments(ctx, bcChild, npf)
getPlanFragments(ctx, x.Children()[1-x.InnerChildIdx], pf)
default:
if len(x.Children()) > 0 {
getPlanFragments(ctx, x.Children()[0], pf)
}
}
}
func (e *MPPGather) appendMPPDispatchReq(pf *planFragment, tasks []*mppTask, isRoot bool) error {
dagReq, _, err := constructDAGReq(e.ctx, []plannercore.PhysicalPlan{pf.exchangeSender}, kv.TiFlash)
if err != nil {
return errors.Trace(err)
}
for i := range pf.p.Schema().Columns {
dagReq.OutputOffsets = append(dagReq.OutputOffsets, uint32(i))
}
if !isRoot {
dagReq.EncodeType = tipb.EncodeType_TypeCHBlock
} else {
dagReq.EncodeType = tipb.EncodeType_TypeChunk
}
for _, mppTask := range tasks {
err := updateExecutorTableID(context.Background(), dagReq.RootExecutor, mppTask.tableID, true)
if err != nil {
return errors.Trace(err)
}
pbData, err := dagReq.Marshal()
if err != nil {
return errors.Trace(err)
}
req := &kv.MPPDispatchRequest{
Data: pbData,
Task: mppTask.taskInfo,
ID: mppTask.id,
IsRoot: isRoot,
Timeout: 10,
SchemaVar: e.is.SchemaMetaVersion(),
StartTs: e.startTS,
}
e.mppReqs = append(e.mppReqs, req)
}
return nil
}
func (e *MPPGather) constructMPPTasks(ctx context.Context, pf *planFragment, isRoot bool) ([]*mppTask, error) {
tasks, err := e.constructMPPTasksImpl(ctx, pf)
if err != nil {
return nil, errors.Trace(err)
}
for _, client := range pf.exchangeReceivers {
client.childPf.exchangeSender.tasks = tasks
client.tasks, err = e.constructMPPTasks(ctx, client.childPf, false)
if err != nil {
return nil, errors.Trace(err)
}
}
err = e.appendMPPDispatchReq(pf, tasks, isRoot)
if err != nil {
return nil, errors.Trace(err)
}
return tasks, nil
}
// Open decides the task counts and locations and generate exchange operators for every plan fragment.
// Then dispatch tasks to tiflash stores. If any task fails, it would cancel the rest tasks.
// TODO: We should retry when the request fails for pure rpc error.
func (e *MPPGather) Open(ctx context.Context) error {
tidbTask := &mppTask{
startTs: e.startTS,
id: -1,
}
rootPf := &planFragment{
p: e.originalPlan,
exchangeSender: &ExchangeSender{exchangeType: tipb.ExchangeType_PassThrough, tasks: []*mppTask{tidbTask}},
}
rootPf.exchangeSender.InitBasePlan(e.ctx, plancodec.TypeExchangeSender)
rootPf.exchangeSender.SetChildren(rootPf.p)
getPlanFragments(e.ctx, e.originalPlan, rootPf)
_, err := e.constructMPPTasks(ctx, rootPf, true)
if err != nil {
return errors.Trace(err)
}
e.respIter, err = distsql.DispatchMPPTasks(ctx, e.ctx, e.mppReqs, e.retFieldTypes)
if err != nil {
return errors.Trace(err)
}
e.respIter.Fetch(ctx)
return nil
}
// Next fills data into the chunk passed by its caller.
func (e *MPPGather) Next(ctx context.Context, chk *chunk.Chunk) error {
err := e.respIter.Next(ctx, chk)
return errors.Trace(err)
}
// Close and release the used resources.
func (e *MPPGather) Close() error {
if e.respIter != nil {
return e.respIter.Close()
}
return nil
}