306 lines
9.0 KiB
Go
306 lines
9.0 KiB
Go
// Copyright 2019 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/pingcap/errors"
|
|
"github.com/pingcap/kvproto/pkg/coprocessor"
|
|
"github.com/pingcap/kvproto/pkg/tikvpb"
|
|
"github.com/pingcap/tidb/pkg/executor/internal/exec"
|
|
"github.com/pingcap/tidb/pkg/infoschema"
|
|
"github.com/pingcap/tidb/pkg/kv"
|
|
"github.com/pingcap/tidb/pkg/parser/auth"
|
|
"github.com/pingcap/tidb/pkg/planner/core"
|
|
"github.com/pingcap/tidb/pkg/privilege"
|
|
"github.com/pingcap/tidb/pkg/sessionctx"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/chunk"
|
|
"github.com/pingcap/tidb/pkg/util/codec"
|
|
"github.com/pingcap/tidb/pkg/util/logutil"
|
|
"github.com/pingcap/tidb/pkg/util/timeutil"
|
|
"github.com/pingcap/tidb/pkg/util/tracing"
|
|
"github.com/pingcap/tipb/go-tipb"
|
|
)
|
|
|
|
func copHandlerCtx(ctx context.Context, req *coprocessor.Request) context.Context {
|
|
source := req.Context.SourceStmt
|
|
if source == nil {
|
|
return ctx
|
|
}
|
|
|
|
traceInfo := &tracing.TraceInfo{
|
|
ConnectionID: source.ConnectionId,
|
|
SessionAlias: source.SessionAlias,
|
|
}
|
|
|
|
ctx = tracing.ContextWithTraceInfo(ctx, traceInfo)
|
|
ctx = logutil.WithTraceFields(ctx, traceInfo)
|
|
return ctx
|
|
}
|
|
|
|
// CoprocessorDAGHandler uses to handle cop dag request.
|
|
type CoprocessorDAGHandler struct {
|
|
sctx sessionctx.Context
|
|
dagReq *tipb.DAGRequest
|
|
}
|
|
|
|
// NewCoprocessorDAGHandler creates a new CoprocessorDAGHandler.
|
|
func NewCoprocessorDAGHandler(sctx sessionctx.Context) *CoprocessorDAGHandler {
|
|
return &CoprocessorDAGHandler{
|
|
sctx: sctx,
|
|
}
|
|
}
|
|
|
|
// HandleRequest handles the coprocessor request.
|
|
func (h *CoprocessorDAGHandler) HandleRequest(ctx context.Context, req *coprocessor.Request) *coprocessor.Response {
|
|
ctx = copHandlerCtx(ctx, req)
|
|
|
|
e, err := h.buildDAGExecutor(ctx, req)
|
|
if err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
|
|
err = exec.Open(ctx, e)
|
|
if err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
|
|
chk := exec.TryNewCacheChunk(e)
|
|
tps := e.RetFieldTypes()
|
|
var totalChunks, partChunks []tipb.Chunk
|
|
memTracker := h.sctx.GetSessionVars().StmtCtx.MemTracker
|
|
for {
|
|
chk.Reset()
|
|
err = exec.Next(ctx, e, chk)
|
|
if err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
if chk.NumRows() == 0 {
|
|
break
|
|
}
|
|
partChunks, err = h.buildChunk(chk, tps)
|
|
if err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
for _, ch := range partChunks {
|
|
memTracker.Consume(int64(ch.Size()))
|
|
}
|
|
totalChunks = append(totalChunks, partChunks...)
|
|
}
|
|
if err := exec.Close(e); err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
return h.buildUnaryResponse(totalChunks)
|
|
}
|
|
|
|
// HandleStreamRequest handles the coprocessor stream request.
|
|
func (h *CoprocessorDAGHandler) HandleStreamRequest(ctx context.Context, req *coprocessor.Request, stream tikvpb.Tikv_CoprocessorStreamServer) error {
|
|
ctx = copHandlerCtx(ctx, req)
|
|
logutil.Logger(ctx).Debug("handle coprocessor stream request")
|
|
|
|
e, err := h.buildDAGExecutor(ctx, req)
|
|
if err != nil {
|
|
return stream.Send(h.buildErrorResponse(err))
|
|
}
|
|
|
|
err = exec.Open(ctx, e)
|
|
if err != nil {
|
|
return stream.Send(h.buildErrorResponse(err))
|
|
}
|
|
|
|
chk := exec.TryNewCacheChunk(e)
|
|
tps := e.RetFieldTypes()
|
|
for {
|
|
chk.Reset()
|
|
if err = exec.Next(ctx, e, chk); err != nil {
|
|
return stream.Send(h.buildErrorResponse(err))
|
|
}
|
|
if chk.NumRows() == 0 {
|
|
return h.buildResponseAndSendToStream(chk, tps, stream)
|
|
}
|
|
if err = h.buildResponseAndSendToStream(chk, tps, stream); err != nil {
|
|
return stream.Send(h.buildErrorResponse(err))
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) buildResponseAndSendToStream(chk *chunk.Chunk, tps []*types.FieldType, stream tikvpb.Tikv_CoprocessorStreamServer) error {
|
|
chunks, err := h.buildChunk(chk, tps)
|
|
if err != nil {
|
|
return stream.Send(h.buildErrorResponse(err))
|
|
}
|
|
|
|
for i := range chunks {
|
|
resp := h.buildStreamResponse(&chunks[i])
|
|
if err = stream.Send(resp); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) buildDAGExecutor(ctx context.Context, req *coprocessor.Request) (exec.Executor, error) {
|
|
if req.GetTp() != kv.ReqTypeDAG {
|
|
return nil, errors.Errorf("unsupported request type %d", req.GetTp())
|
|
}
|
|
dagReq := new(tipb.DAGRequest)
|
|
err := proto.Unmarshal(req.Data, dagReq)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
|
|
if dagReq.User != nil {
|
|
pm := privilege.GetPrivilegeManager(h.sctx)
|
|
if pm != nil {
|
|
h.sctx.GetSessionVars().User = &auth.UserIdentity{
|
|
Username: dagReq.User.UserName,
|
|
Hostname: dagReq.User.UserHost,
|
|
}
|
|
authName, authHost, success := pm.MatchIdentity(ctx, dagReq.User.UserName, dagReq.User.UserHost, false)
|
|
if success && pm.GetAuthWithoutVerification(authName, authHost) {
|
|
h.sctx.GetSessionVars().User.AuthUsername = authName
|
|
h.sctx.GetSessionVars().User.AuthHostname = authHost
|
|
h.sctx.GetSessionVars().ActiveRoles = pm.GetDefaultRoles(ctx, authName, authHost)
|
|
}
|
|
}
|
|
}
|
|
|
|
stmtCtx := h.sctx.GetSessionVars().StmtCtx
|
|
|
|
tz, err := timeutil.ConstructTimeZone(dagReq.TimeZoneName, int(dagReq.TimeZoneOffset))
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
h.sctx.GetSessionVars().TimeZone = tz
|
|
stmtCtx.InitFromPBFlagAndTz(dagReq.Flags, tz)
|
|
|
|
h.dagReq = dagReq
|
|
is := h.sctx.GetInfoSchema().(infoschema.InfoSchema)
|
|
// Build physical plan.
|
|
bp := core.NewPBPlanBuilder(h.sctx.GetPlanCtx(), is, req.Ranges)
|
|
plan, err := bp.Build(dagReq.Executors)
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
plan = core.InjectExtraProjection(plan)
|
|
// Build executor.
|
|
b := newExecutorBuilder(h.sctx, is, nil)
|
|
return b.build(plan), nil
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) buildChunk(chk *chunk.Chunk, tps []*types.FieldType) (chunks []tipb.Chunk, err error) {
|
|
switch h.dagReq.EncodeType {
|
|
case tipb.EncodeType_TypeDefault:
|
|
chunks, err = h.encodeDefault(chk, tps)
|
|
case tipb.EncodeType_TypeChunk:
|
|
chunks, err = h.encodeChunk(chk, tps)
|
|
default:
|
|
return nil, errors.Errorf("unknown DAG encode type: %v", h.dagReq.EncodeType)
|
|
}
|
|
return chunks, err
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) buildUnaryResponse(chunks []tipb.Chunk) *coprocessor.Response {
|
|
selResp := tipb.SelectResponse{
|
|
Chunks: chunks,
|
|
EncodeType: h.dagReq.EncodeType,
|
|
}
|
|
if h.dagReq.GetCollectExecutionSummaries() {
|
|
execSummary := make([]*tipb.ExecutorExecutionSummary, len(h.dagReq.Executors))
|
|
for i := range execSummary {
|
|
// TODO: Add real executor execution summary information.
|
|
execSummary[i] = &tipb.ExecutorExecutionSummary{}
|
|
}
|
|
selResp.ExecutionSummaries = execSummary
|
|
}
|
|
data, err := proto.Marshal(&selResp)
|
|
if err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
return &coprocessor.Response{
|
|
Data: data,
|
|
}
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) buildStreamResponse(chunk *tipb.Chunk) *coprocessor.Response {
|
|
data, err := chunk.Marshal()
|
|
if err != nil {
|
|
return h.buildErrorResponse(err)
|
|
}
|
|
streamResponse := tipb.StreamResponse{
|
|
Data: data,
|
|
}
|
|
var resp = &coprocessor.Response{}
|
|
resp.Data, err = proto.Marshal(&streamResponse)
|
|
if err != nil {
|
|
resp.OtherError = err.Error()
|
|
}
|
|
return resp
|
|
}
|
|
|
|
func (*CoprocessorDAGHandler) buildErrorResponse(err error) *coprocessor.Response {
|
|
return &coprocessor.Response{
|
|
OtherError: errors.ErrorStack(err),
|
|
}
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) encodeChunk(chk *chunk.Chunk, colTypes []*types.FieldType) ([]tipb.Chunk, error) {
|
|
colOrdinal := h.dagReq.OutputOffsets
|
|
respColTypes := make([]*types.FieldType, 0, len(colOrdinal))
|
|
for _, ordinal := range colOrdinal {
|
|
respColTypes = append(respColTypes, colTypes[ordinal])
|
|
}
|
|
encoder := chunk.NewCodec(respColTypes)
|
|
cur := tipb.Chunk{}
|
|
cur.RowsData = append(cur.RowsData, encoder.Encode(chk)...)
|
|
return []tipb.Chunk{cur}, nil
|
|
}
|
|
|
|
func (h *CoprocessorDAGHandler) encodeDefault(chk *chunk.Chunk, tps []*types.FieldType) ([]tipb.Chunk, error) {
|
|
colOrdinal := h.dagReq.OutputOffsets
|
|
stmtCtx := h.sctx.GetSessionVars().StmtCtx
|
|
requestedRow := make([]byte, 0)
|
|
chunks := []tipb.Chunk{}
|
|
errCtx := stmtCtx.ErrCtx()
|
|
for i := range chk.NumRows() {
|
|
requestedRow = requestedRow[:0]
|
|
row := chk.GetRow(i)
|
|
for _, ordinal := range colOrdinal {
|
|
data, err := codec.EncodeValue(stmtCtx.TimeZone(), nil, row.GetDatum(int(ordinal), tps[ordinal]))
|
|
err = errCtx.HandleError(err)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
requestedRow = append(requestedRow, data...)
|
|
}
|
|
chunks = h.appendRow(chunks, requestedRow, i)
|
|
}
|
|
return chunks, nil
|
|
}
|
|
|
|
const rowsPerChunk = 64
|
|
|
|
func (*CoprocessorDAGHandler) appendRow(chunks []tipb.Chunk, data []byte, rowCnt int) []tipb.Chunk {
|
|
if rowCnt%rowsPerChunk == 0 {
|
|
chunks = append(chunks, tipb.Chunk{})
|
|
}
|
|
cur := &chunks[len(chunks)-1]
|
|
cur.RowsData = append(cur.RowsData, data...)
|
|
return chunks
|
|
}
|