mocktikv: move coprHandler to standalone file (#24067)
This commit is contained in:
65
store/mockstore/mocktikv/copr_handler.go
Normal file
65
store/mockstore/mocktikv/copr_handler.go
Normal file
@ -0,0 +1,65 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package mocktikv
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/coprocessor"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
)
|
||||
|
||||
type coprHandler struct {
|
||||
*Session
|
||||
}
|
||||
|
||||
func (h coprHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) {
|
||||
client := &mockBatchCopDataClient{}
|
||||
for _, ri := range req.Regions {
|
||||
cop := coprocessor.Request{
|
||||
Tp: kv.ReqTypeDAG,
|
||||
Data: req.Data,
|
||||
StartTs: req.StartTs,
|
||||
Ranges: ri.Ranges,
|
||||
}
|
||||
_, exec, dagReq, err := h.buildDAGExecutor(&cop)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
chunk, err := drainRowsFromExecutor(ctx, exec, dagReq)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
client.chunks = append(client.chunks, chunk)
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) {
|
||||
var chunk tipb.Chunk
|
||||
for {
|
||||
row, err := e.Next(ctx)
|
||||
if err != nil {
|
||||
return chunk, errors.Trace(err)
|
||||
}
|
||||
if row == nil {
|
||||
return chunk, nil
|
||||
}
|
||||
for _, offset := range req.OutputOffsets {
|
||||
chunk.RowsData = append(chunk.RowsData, row[offset]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -33,7 +33,6 @@ import (
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/store/tikv/tikvrpc"
|
||||
"github.com/pingcap/tipb/go-tipb"
|
||||
)
|
||||
|
||||
// For gofail injection.
|
||||
@ -553,48 +552,6 @@ func (h kvHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb.S
|
||||
return resp
|
||||
}
|
||||
|
||||
func drainRowsFromExecutor(ctx context.Context, e executor, req *tipb.DAGRequest) (tipb.Chunk, error) {
|
||||
var chunk tipb.Chunk
|
||||
for {
|
||||
row, err := e.Next(ctx)
|
||||
if err != nil {
|
||||
return chunk, errors.Trace(err)
|
||||
}
|
||||
if row == nil {
|
||||
return chunk, nil
|
||||
}
|
||||
for _, offset := range req.OutputOffsets {
|
||||
chunk.RowsData = append(chunk.RowsData, row[offset]...)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type coprHandler struct {
|
||||
*Session
|
||||
}
|
||||
|
||||
func (h coprHandler) handleBatchCopRequest(ctx context.Context, req *coprocessor.BatchRequest) (*mockBatchCopDataClient, error) {
|
||||
client := &mockBatchCopDataClient{}
|
||||
for _, ri := range req.Regions {
|
||||
cop := coprocessor.Request{
|
||||
Tp: kv.ReqTypeDAG,
|
||||
Data: req.Data,
|
||||
StartTs: req.StartTs,
|
||||
Ranges: ri.Ranges,
|
||||
}
|
||||
_, exec, dagReq, err := h.buildDAGExecutor(&cop)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
chunk, err := drainRowsFromExecutor(ctx, exec, dagReq)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
client.chunks = append(client.chunks, chunk)
|
||||
}
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// Client is a client that sends RPC.
|
||||
// This is same with tikv.Client, define again for avoid circle import.
|
||||
type Client interface {
|
||||
|
||||
Reference in New Issue
Block a user