diff --git a/store/mockstore/mocktikv/copr_handler.go b/store/mockstore/mocktikv/copr_handler.go new file mode 100644 index 0000000000..ea9b16acd8 --- /dev/null +++ b/store/mockstore/mocktikv/copr_handler.go @@ -0,0 +1,65 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package mocktikv + +import ( + "context" + + "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/coprocessor" + "github.com/pingcap/tidb/kv" + "github.com/pingcap/tipb/go-tipb" +) + +type coprHandler struct { + *Session +} + +func (h coprHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) { + client := &mockBatchCopDataClient{} + for _, ri := range req.Regions { + cop := coprocessor.Request{ + Tp: kv.ReqTypeDAG, + Data: req.Data, + StartTs: req.StartTs, + Ranges: ri.Ranges, + } + _, exec, dagReq, err := h.buildDAGExecutor(&cop) + if err != nil { + return nil, errors.Trace(err) + } + chunk, err := drainRowsFromExecutor(ctx, exec, dagReq) + if err != nil { + return nil, errors.Trace(err) + } + client.chunks = append(client.chunks, chunk) + } + return client, nil +} + +func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) { + var chunk tipb.Chunk + for { + row, err := e.Next(ctx) + if err != nil { + return chunk, errors.Trace(err) + } + if row == nil { + return chunk, nil + } + for _, offset := range req.OutputOffsets { + chunk.RowsData = append(chunk.RowsData, row[offset]...) + } + } +} diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 30ea0a969c..4ba4da6e60 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -33,7 +33,6 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/tikvrpc" - "github.com/pingcap/tipb/go-tipb" ) // For gofail injection. @@ -553,48 +552,6 @@ func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.S return resp } -func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) { - var chunk tipb.Chunk - for { - row, err := e.Next(ctx) - if err != nil { - return chunk, errors.Trace(err) - } - if row == nil { - return chunk, nil - } - for _, offset := range req.OutputOffsets { - chunk.RowsData = append(chunk.RowsData, row[offset]...) - } - } -} - -type coprHandler struct { - *Session -} - -func (h coprHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) { - client := &mockBatchCopDataClient{} - for _, ri := range req.Regions { - cop := coprocessor.Request{ - Tp: kv.ReqTypeDAG, - Data: req.Data, - StartTs: req.StartTs, - Ranges: ri.Ranges, - } - _, exec, dagReq, err := h.buildDAGExecutor(&cop) - if err != nil { - return nil, errors.Trace(err) - } - chunk, err := drainRowsFromExecutor(ctx, exec, dagReq) - if err != nil { - return nil, errors.Trace(err) - } - client.chunks = append(client.chunks, chunk) - } - return client, nil -} - // Client is a client that sends RPC. // This is same with tikv.Client, define again for avoid circle import. type Client interface {