store, tikv: let client of mock-tikv support dag req. (#3172)
This commit is contained in:
@ -42,6 +42,8 @@ func (c *CopClient) SupportRequestType(reqType, subType int64) bool {
|
||||
default:
|
||||
return supportExpr(tipb.ExprType(subType))
|
||||
}
|
||||
case kv.ReqTypeDAG:
|
||||
return c.store.mock
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@ -105,6 +105,7 @@ type tikvStore struct {
|
||||
lockResolver *LockResolver
|
||||
gcWorker *GCWorker
|
||||
etcdAddrs []string
|
||||
mock bool
|
||||
}
|
||||
|
||||
func newTikvStore(uuid string, pdClient pd.Client, client Client, enableGC bool) (*tikvStore, error) {
|
||||
@ -112,13 +113,14 @@ func newTikvStore(uuid string, pdClient pd.Client, client Client, enableGC bool)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
_, mock := client.(*mocktikv.RPCClient)
|
||||
store := &tikvStore{
|
||||
clusterID: pdClient.GetClusterID(goctx.TODO()),
|
||||
uuid: uuid,
|
||||
oracle: oracle,
|
||||
client: client,
|
||||
regionCache: NewRegionCache(pdClient),
|
||||
mock: mock,
|
||||
}
|
||||
store.lockResolver = newLockResolver(store)
|
||||
if enableGC {
|
||||
|
||||
@ -15,13 +15,11 @@ package tikv
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/pd/pd-client"
|
||||
goctx "golang.org/x/net/context"
|
||||
)
|
||||
|
||||
@ -47,20 +45,6 @@ func newLockResolver(store *tikvStore) *LockResolver {
|
||||
return r
|
||||
}
|
||||
|
||||
// NewLockResolver creates a LockResolver.
|
||||
func NewLockResolver(etcdAddrs []string) (*LockResolver, error) {
|
||||
pdCli, err := pd.NewClient(etcdAddrs)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(goctx.TODO()))
|
||||
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, newRPCClient(), false)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return s.lockResolver, nil
|
||||
}
|
||||
|
||||
// TxnStatus represents a txn's final status. It should be Commit or Rollback.
|
||||
type TxnStatus uint64
|
||||
|
||||
|
||||
@ -30,13 +30,9 @@ var _ = Suite(&testSplitSuite{})
|
||||
|
||||
func (s *testSplitSuite) SetUpTest(c *C) {
|
||||
s.cluster = mocktikv.NewCluster()
|
||||
mocktikv.BootstrapWithSingleStore(s.cluster)
|
||||
mvccStore := mocktikv.NewMvccStore()
|
||||
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
|
||||
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
|
||||
store, err := newTikvStore("mock-tikv-store", pdCli, client, false)
|
||||
c.Assert(err, IsNil)
|
||||
s.store = store
|
||||
store, err := NewMockTikvStoreWithCluster(s.cluster)
|
||||
c.Check(err, IsNil)
|
||||
s.store = store.(*tikvStore)
|
||||
s.bo = NewBackoffer(5000, context.Background())
|
||||
}
|
||||
|
||||
|
||||
@ -39,13 +39,9 @@ var _ = Suite(&testStoreSuite{})
|
||||
|
||||
func (s *testStoreSuite) SetUpTest(c *C) {
|
||||
s.cluster = mocktikv.NewCluster()
|
||||
mocktikv.BootstrapWithSingleStore(s.cluster)
|
||||
mvccStore := mocktikv.NewMvccStore()
|
||||
clientFactory := mocktikv.NewRPCClient(s.cluster, mvccStore)
|
||||
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
|
||||
store, err := newTikvStore("mock-tikv-store", pdCli, clientFactory, false)
|
||||
c.Assert(err, IsNil)
|
||||
s.store = store
|
||||
store, err := NewMockTikvStoreWithCluster(s.cluster)
|
||||
c.Check(err, IsNil)
|
||||
s.store = store.(*tikvStore)
|
||||
}
|
||||
|
||||
func (s *testStoreSuite) TestParsePath(c *C) {
|
||||
|
||||
Reference in New Issue
Block a user