Files
tidb/executor/coprocessor.go

184 lines
5.1 KiB
Go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package executor
import (
"context"
"github.com/gogo/protobuf/proto"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/timeutil"
"github.com/pingcap/tipb/go-tipb"
)
// CoprocessorDAGHandler uses to handle cop dag request.
type CoprocessorDAGHandler struct {
sctx sessionctx.Context
resp *coprocessor.Response
selResp *tipb.SelectResponse
dagReq *tipb.DAGRequest
}
// NewCoprocessorDAGHandler creates a new CoprocessorDAGHandler.
func NewCoprocessorDAGHandler(sctx sessionctx.Context) *CoprocessorDAGHandler {
return &CoprocessorDAGHandler{
sctx: sctx,
resp: &coprocessor.Response{},
selResp: &tipb.SelectResponse{},
}
}
// HandleRequest handles the coprocessor request.
func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coprocessor.Request) *coprocessor.Response {
e, err := h.buildDAGExecutor(req)
if err != nil {
return h.buildResponse(err)
}
err = e.Open(ctx)
if err != nil {
return h.buildResponse(err)
}
chk := newFirstChunk(e)
tps := e.base().retFieldTypes
for {
chk.Reset()
err = Next(ctx, e, chk)
if err != nil {
break
}
if chk.NumRows() == 0 {
break
}
err = h.appendChunk(chk, tps)
if err != nil {
break
}
}
return h.buildResponse(err)
}
func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Executor, error) {
if req.GetTp() != kv.ReqTypeDAG {
return nil, errors.Errorf("unsupported request type %d", req.GetTp())
}
dagReq := new(tipb.DAGRequest)
err := proto.Unmarshal(req.Data, dagReq)
if err != nil {
return nil, errors.Trace(err)
}
stmtCtx := h.sctx.GetSessionVars().StmtCtx
stmtCtx.SetFlagsFromPBFlag(dagReq.Flags)
stmtCtx.TimeZone, err = timeutil.ConstructTimeZone(dagReq.TimeZoneName, int(dagReq.TimeZoneOffset))
if err != nil {
return nil, errors.Trace(err)
}
h.dagReq = dagReq
is := h.sctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
// Build physical plan.
bp := core.NewPBPlanBuilder(h.sctx, is)
plan, err := bp.Build(dagReq.Executors)
if err != nil {
return nil, errors.Trace(err)
}
// Build executor.
b := newExecutorBuilder(h.sctx, is)
return b.build(plan), nil
}
func (h *CoprocessorDAGHandler) appendChunk(chk *chunk.Chunk, tps []*types.FieldType) error {
var err error
switch h.dagReq.EncodeType {
case tipb.EncodeType_TypeDefault:
err = h.encodeDefault(chk, tps)
case tipb.EncodeType_TypeChunk:
err = h.encodeChunk(chk, tps)
default:
return errors.Errorf("unknown DAG encode type: %v", h.dagReq.EncodeType)
}
return err
}
func (h *CoprocessorDAGHandler) buildResponse(err error) *coprocessor.Response {
if err != nil {
h.resp.OtherError = err.Error()
return h.resp
}
h.selResp.EncodeType = h.dagReq.EncodeType
data, err := proto.Marshal(h.selResp)
if err != nil {
h.resp.OtherError = err.Error()
return h.resp
}
h.resp.Data = data
return h.resp
}
func (h *CoprocessorDAGHandler) encodeChunk(chk *chunk.Chunk, colTypes []*types.FieldType) error {
colOrdinal := h.dagReq.OutputOffsets
chunks := h.selResp.Chunks
respColTypes := make([]*types.FieldType, 0, len(colOrdinal))
for _, ordinal := range colOrdinal {
respColTypes = append(respColTypes, colTypes[ordinal])
}
encoder := chunk.NewCodec(respColTypes)
chunks = append(chunks, tipb.Chunk{})
cur := &chunks[len(chunks)-1]
cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...)
h.selResp.Chunks = chunks
return nil
}
func (h *CoprocessorDAGHandler) encodeDefault(chk *chunk.Chunk, tps []*types.FieldType) error {
colOrdinal := h.dagReq.OutputOffsets
chunks := h.selResp.Chunks
stmtCtx := h.sctx.GetSessionVars().StmtCtx
requestedRow := make([]byte, 0)
for i := 0; i < chk.NumRows(); i++ {
requestedRow = requestedRow[:0]
row := chk.GetRow(i)
for _, ordinal := range colOrdinal {
data, err := codec.EncodeValue(stmtCtx, nil, row.GetDatum(int(ordinal), tps[ordinal]))
if err != nil {
return err
}
requestedRow = append(requestedRow, data...)
}
chunks = h.appendRow(chunks, requestedRow, i)
}
h.selResp.Chunks = chunks
return nil
}
const rowsPerChunk = 64
func (h *CoprocessorDAGHandler) appendRow(chunks []tipb.Chunk, data []byte, rowCnt int) []tipb.Chunk {
if rowCnt%rowsPerChunk == 0 {
chunks = append(chunks, tipb.Chunk{})
}
cur := &chunks[len(chunks)-1]
cur.RowsData = append(cur.RowsData, data...)
return chunks
}