dxfservice: create store for SYSTEM keyspace (#61752)
ref pingcap/tidb#61702
This commit is contained in:
@ -46,7 +46,7 @@ func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) {
|
||||
conf.Security.ClusterSSLKey = option.KeyPath
|
||||
config.StoreGlobalConfig(conf)
|
||||
}
|
||||
return driver.TiKVDriver{}.Open(path)
|
||||
return (&driver.TiKVDriver{}).Open(path)
|
||||
}
|
||||
|
||||
// OwnsStorage implements glue.Glue.
|
||||
|
||||
@ -60,7 +60,7 @@ func main() {
|
||||
flag.PrintDefaults()
|
||||
err := logutil.InitLogger(logutil.NewLogConfig(*logLevel, logutil.DefaultLogFormat, "", "", logutil.EmptyFileLogConfig, false))
|
||||
terror.MustNil(err)
|
||||
err = store.Register(config.StoreTypeTiKV, driver.TiKVDriver{})
|
||||
err = store.Register(config.StoreTypeTiKV, &driver.TiKVDriver{})
|
||||
terror.MustNil(err)
|
||||
ut := newBenchDB()
|
||||
works := strings.Split(*runJobs, "|")
|
||||
|
||||
@ -1163,5 +1163,5 @@ func addEnvPath(newPath string) {
|
||||
}
|
||||
|
||||
func init() {
|
||||
_ = store.Register(config.StoreTypeTiKV, tidbdriver.TiKVDriver{})
|
||||
_ = store.Register(config.StoreTypeTiKV, &tidbdriver.TiKVDriver{})
|
||||
}
|
||||
|
||||
@ -407,7 +407,7 @@ func setCPUAffinity() {
|
||||
}
|
||||
|
||||
func registerStores() {
|
||||
err := kvstore.Register(config.StoreTypeTiKV, driver.TiKVDriver{})
|
||||
err := kvstore.Register(config.StoreTypeTiKV, &driver.TiKVDriver{})
|
||||
terror.MustNil(err)
|
||||
err = kvstore.Register(config.StoreTypeMockTiKV, mockstore.MockTiKVDriver{})
|
||||
terror.MustNil(err)
|
||||
@ -416,16 +416,7 @@ func registerStores() {
|
||||
}
|
||||
|
||||
func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) {
|
||||
cfg := config.GetGlobalConfig()
|
||||
var fullPath string
|
||||
if keyspaceName == "" {
|
||||
fullPath = fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)
|
||||
} else {
|
||||
fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", cfg.Store, cfg.Path, keyspaceName)
|
||||
}
|
||||
var err error
|
||||
storage, err := kvstore.New(fullPath)
|
||||
terror.MustNil(err)
|
||||
storage := kvstore.MustInitStorage(keyspaceName)
|
||||
if tikvStore, ok := storage.(kv.StorageWithPD); ok {
|
||||
pdhttpCli := tikvStore.GetPDHTTPClient()
|
||||
// unistore also implements kv.StorageWithPD, but it does not have PD client.
|
||||
@ -441,7 +432,7 @@ func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.D
|
||||
copr.GlobalMPPFailedStoreProber.Run()
|
||||
mppcoordmanager.InstanceMPPCoordinatorManager.Run()
|
||||
// Bootstrap a session to load information schema.
|
||||
err = ddl.StartOwnerManager(context.Background(), storage)
|
||||
err := ddl.StartOwnerManager(context.Background(), storage)
|
||||
terror.MustNil(err)
|
||||
dom, err := session.BootstrapSession(storage)
|
||||
terror.MustNil(err)
|
||||
@ -934,6 +925,10 @@ func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) {
|
||||
mppcoordmanager.InstanceMPPCoordinatorManager.Stop()
|
||||
err := storage.Close()
|
||||
terror.Log(errors.Trace(err))
|
||||
if kerneltype.IsNextGen() && keyspace.IsRunningOnUser() {
|
||||
err = kvstore.GetSystemStorage().Close()
|
||||
terror.Log(errors.Annotate(err, "close system storage"))
|
||||
}
|
||||
}
|
||||
|
||||
// The amount of time we wait for the ongoing txt to finished.
|
||||
|
||||
@ -1351,7 +1351,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
|
||||
u = strings.TrimPrefix(u, "https://")
|
||||
urlsWithoutScheme = append(urlsWithoutScheme, u)
|
||||
}
|
||||
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
|
||||
kvStore, err = (&driver.TiKVDriver{}).OpenWithOptions(
|
||||
fmt.Sprintf(
|
||||
"tikv://%s?disableGC=true&keyspaceName=%s",
|
||||
strings.Join(urlsWithoutScheme, ","), rc.keyspaceName,
|
||||
|
||||
@ -134,8 +134,8 @@ func NewMockDomain() *Domain {
|
||||
return do
|
||||
}
|
||||
|
||||
// Domain represents a storage space. Different domains can use the same database name.
|
||||
// Multiple domains can be used in parallel without synchronization.
|
||||
// Domain manages life cycle of nearly all other components related to SQL execution
|
||||
// of a TiDB instance, only one domain can exist at a time.
|
||||
type Domain struct {
|
||||
store kv.Storage
|
||||
infoCache *infoschema.InfoCache
|
||||
|
||||
@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||
|
||||
go_library(
|
||||
name = "keyspace",
|
||||
srcs = ["keyspace.go"],
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"keyspace.go",
|
||||
],
|
||||
importpath = "github.com/pingcap/tidb/pkg/keyspace",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
|
||||
35
pkg/keyspace/doc.go
Normal file
35
pkg/keyspace/doc.go
Normal file
@ -0,0 +1,35 @@
|
||||
// Copyright 2025 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// Package keyspace provides utilities for keyspace for nextgen TiDB.
|
||||
//
|
||||
// Keyspace are used to isolate data and operations, allowing for multi-tenancy
|
||||
// in next generation TiDB. Each keyspace represents a logical cluster on top of
|
||||
// the underlying physical cluster.
|
||||
//
|
||||
// There is a special keyspace named "SYSTEM", which is reserved for system-level
|
||||
// services and data, currently, only the DXF service uses this keyspace.
|
||||
// As user keyspace depends on SYSTEM keyspace, we need to make sure SYSTEM
|
||||
// keyspace exist before user keyspace start serving any user traffic. So for the
|
||||
// deployment of nextgen cluster, we need to:
|
||||
// - Deploy PD/TiKV and other components, wait them to be ready to serve TiDB access.
|
||||
// - Deploy SYSTEM keyspace, wait it fully bootstrapped.
|
||||
// - Deploy other user keyspace, they can be deployed concurrently.
|
||||
//
|
||||
// During upgrade, we also need to follow above order, i.e. We need to upgrade
|
||||
// the SYSTEM keyspace first, then user keyspace.
|
||||
//
|
||||
// Note: serverless also use keyspace, and have the special NULL and DEFAULT
|
||||
// keyspace, while nextgen hasn't.
|
||||
package keyspace
|
||||
@ -27,6 +27,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// System is the keyspace name for SYSTEM keyspace.
|
||||
// see doc.go for more detail.
|
||||
System = "SYSTEM"
|
||||
// tidbKeyspaceEtcdPathPrefix is the keyspace prefix for etcd namespace
|
||||
tidbKeyspaceEtcdPathPrefix = "/keyspaces/tidb/"
|
||||
)
|
||||
@ -87,3 +90,8 @@ func WrapZapcoreWithKeyspace() zap.Option {
|
||||
return core
|
||||
})
|
||||
}
|
||||
|
||||
// IsRunningOnUser checks if keyspace of current instance is a user keyspace.
|
||||
func IsRunningOnUser() bool {
|
||||
return config.GetGlobalKeyspaceName() != System
|
||||
}
|
||||
|
||||
@ -279,6 +279,10 @@ func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (s *mockStorage) GetClusterID() uint64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
// newMockStorage creates a new mockStorage.
|
||||
func newMockStorage() Storage {
|
||||
return &mockStorage{}
|
||||
|
||||
@ -728,6 +728,9 @@ type Storage interface {
|
||||
SetOption(k any, v any)
|
||||
// GetOption is a thin wrapper around sync.Map.
|
||||
GetOption(k any) (any, bool)
|
||||
// GetClusterID returns the physical cluster ID of the storage.
|
||||
// for nextgen, all keyspace in the storage share the same cluster ID.
|
||||
GetClusterID() uint64
|
||||
}
|
||||
|
||||
// EtcdBackend is used for judging a storage is a real TiKV.
|
||||
|
||||
@ -21,6 +21,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/bindinfo",
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
"//pkg/ddl",
|
||||
"//pkg/ddl/placement",
|
||||
"//pkg/ddl/schematracker",
|
||||
@ -43,6 +44,7 @@ go_library(
|
||||
"//pkg/extension/extensionimpl",
|
||||
"//pkg/infoschema",
|
||||
"//pkg/infoschema/context",
|
||||
"//pkg/keyspace",
|
||||
"//pkg/kv",
|
||||
"//pkg/meta",
|
||||
"//pkg/meta/metabuild",
|
||||
|
||||
@ -42,6 +42,7 @@ import (
|
||||
"github.com/pingcap/kvproto/pkg/kvrpcpb"
|
||||
"github.com/pingcap/tidb/pkg/bindinfo"
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/ddl"
|
||||
"github.com/pingcap/tidb/pkg/ddl/placement"
|
||||
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
|
||||
@ -61,6 +62,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/extension/extensionimpl"
|
||||
"github.com/pingcap/tidb/pkg/infoschema"
|
||||
infoschemactx "github.com/pingcap/tidb/pkg/infoschema/context"
|
||||
"github.com/pingcap/tidb/pkg/keyspace"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/meta"
|
||||
"github.com/pingcap/tidb/pkg/meta/metabuild"
|
||||
@ -96,6 +98,7 @@ import (
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/syncload"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/usage"
|
||||
"github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage"
|
||||
kvstore "github.com/pingcap/tidb/pkg/store"
|
||||
storeerr "github.com/pingcap/tidb/pkg/store/driver/error"
|
||||
"github.com/pingcap/tidb/pkg/store/helper"
|
||||
"github.com/pingcap/tidb/pkg/table"
|
||||
@ -3484,6 +3487,17 @@ func BootstrapSession4DistExecution(store kv.Storage) (*domain.Domain, error) {
|
||||
// such as system time zone
|
||||
// - start domain and other routines.
|
||||
func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsImpl func(store kv.Storage, cnt int) ([]*session, error)) (*domain.Domain, error) {
|
||||
ver := getStoreBootstrapVersionWithCache(store)
|
||||
if kerneltype.IsNextGen() && keyspace.IsRunningOnUser() {
|
||||
systemKSVer := mustGetStoreBootstrapVersion(kvstore.GetSystemStorage())
|
||||
if systemKSVer == notBootstrapped {
|
||||
logutil.BgLogger().Fatal("SYSTEM keyspace is not bootstrapped")
|
||||
} else if ver > systemKSVer {
|
||||
logutil.BgLogger().Fatal("bootstrap version of user keyspace must be smaller or equal to that of SYSTEM keyspace",
|
||||
zap.Int64("user", ver), zap.Int64("system", systemKSVer))
|
||||
}
|
||||
}
|
||||
|
||||
ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBootstrap)
|
||||
cfg := config.GetGlobalConfig()
|
||||
if len(cfg.Instance.PluginLoad) > 0 {
|
||||
@ -3515,7 +3529,6 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ver := getStoreBootstrapVersionWithCache(store)
|
||||
if ver < currentBootstrapVersion {
|
||||
runInBootstrapSession(store, ver)
|
||||
} else {
|
||||
@ -3798,20 +3811,20 @@ func runInBootstrapSession(store kv.Storage, ver int64) {
|
||||
}
|
||||
|
||||
func createSessions(store kv.Storage, cnt int) ([]*session, error) {
|
||||
return createSessionsImpl(store, cnt, createSession)
|
||||
return createSessionsImpl(store, cnt)
|
||||
}
|
||||
|
||||
func createSessions4DistExecution(store kv.Storage, cnt int) ([]*session, error) {
|
||||
domap.Delete(store)
|
||||
|
||||
return createSessionsImpl(store, cnt, createSession4DistExecution)
|
||||
return createSessionsImpl(store, cnt)
|
||||
}
|
||||
|
||||
func createSessionsImpl(store kv.Storage, cnt int, createSessionImpl func(kv.Storage) (*session, error)) ([]*session, error) {
|
||||
func createSessionsImpl(store kv.Storage, cnt int) ([]*session, error) {
|
||||
// Then we can create new dom
|
||||
ses := make([]*session, cnt)
|
||||
for i := range cnt {
|
||||
se, err := createSessionImpl(store)
|
||||
se, err := createSession(store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -3829,10 +3842,6 @@ func createSession(store kv.Storage) (*session, error) {
|
||||
return createSessionWithOpt(store, nil)
|
||||
}
|
||||
|
||||
func createSession4DistExecution(store kv.Storage) (*session, error) {
|
||||
return createSessionWithOpt(store, nil)
|
||||
}
|
||||
|
||||
func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) {
|
||||
dom, err := domap.Get(store)
|
||||
if err != nil {
|
||||
|
||||
@ -23,7 +23,6 @@ import (
|
||||
|
||||
"github.com/pingcap/tidb/pkg/bindinfo"
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/domain"
|
||||
"github.com/pingcap/tidb/pkg/expression"
|
||||
"github.com/pingcap/tidb/pkg/infoschema"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
@ -1912,10 +1911,7 @@ func upgradeToVer241(s sessiontypes.Session, _ int64) {
|
||||
|
||||
// writeClusterID writes cluster id into mysql.tidb
|
||||
func writeClusterID(s sessiontypes.Session) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(internalSQLTimeout)*time.Second)
|
||||
defer cancel()
|
||||
|
||||
clusterID := s.GetDomain().(*domain.Domain).GetPDClient().GetClusterID(ctx)
|
||||
clusterID := s.GetStore().GetClusterID()
|
||||
|
||||
mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Cluster ID.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`,
|
||||
mysql.SystemDB,
|
||||
|
||||
@ -10,8 +10,10 @@ go_library(
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
"//pkg/keyspace",
|
||||
"//pkg/kv",
|
||||
"//pkg/parser/terror",
|
||||
"//pkg/util",
|
||||
"//pkg/util/etcd",
|
||||
"//pkg/util/logutil",
|
||||
@ -36,10 +38,12 @@ go_test(
|
||||
],
|
||||
embed = [":store"],
|
||||
flaky = True,
|
||||
shard_count = 23,
|
||||
shard_count = 24,
|
||||
deps = [
|
||||
"//pkg/config",
|
||||
"//pkg/config/kerneltype",
|
||||
"//pkg/domain",
|
||||
"//pkg/keyspace",
|
||||
"//pkg/kv",
|
||||
"//pkg/store/mockstore",
|
||||
"//pkg/store/mockstore/unistore",
|
||||
|
||||
@ -25,7 +25,6 @@ go_library(
|
||||
"@com_github_tikv_pd_client//:client",
|
||||
"@com_github_tikv_pd_client//http",
|
||||
"@com_github_tikv_pd_client//opt",
|
||||
"@com_github_tikv_pd_client//pkg/caller",
|
||||
"@org_golang_google_grpc//:grpc",
|
||||
"@org_golang_google_grpc//keepalive",
|
||||
"@org_uber_go_zap//:zap",
|
||||
|
||||
@ -43,7 +43,6 @@ import (
|
||||
pd "github.com/tikv/pd/client"
|
||||
pdhttp "github.com/tikv/pd/client/http"
|
||||
"github.com/tikv/pd/client/opt"
|
||||
"github.com/tikv/pd/client/pkg/caller"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
@ -95,10 +94,6 @@ func WithPDClientConfig(client config.PDClient) Option {
|
||||
}
|
||||
}
|
||||
|
||||
func getKVStore(path string, tls config.Security) (kv.Storage, error) {
|
||||
return TiKVDriver{}.OpenWithOptions(path, WithSecurity(tls))
|
||||
}
|
||||
|
||||
// TiKVDriver implements engine TiKV.
|
||||
type TiKVDriver struct {
|
||||
pdConfig config.PDClient
|
||||
@ -109,7 +104,7 @@ type TiKVDriver struct {
|
||||
|
||||
// Open opens or creates an TiKV storage with given path using global config.
|
||||
// Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false
|
||||
func (d TiKVDriver) Open(path string) (kv.Storage, error) {
|
||||
func (d *TiKVDriver) Open(path string) (kv.Storage, error) {
|
||||
return d.OpenWithOptions(path)
|
||||
}
|
||||
|
||||
@ -126,7 +121,7 @@ func (d *TiKVDriver) setDefaultAndOptions(options ...Option) {
|
||||
|
||||
// OpenWithOptions is used by other program that use tidb as a library, to avoid modifying GlobalConfig
|
||||
// unspecified options will be set to global config
|
||||
func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv.Storage, err error) {
|
||||
func (d *TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv.Storage, err error) {
|
||||
mc.Lock()
|
||||
defer mc.Unlock()
|
||||
d.setDefaultAndOptions(options...)
|
||||
@ -156,11 +151,17 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv
|
||||
}
|
||||
}()
|
||||
|
||||
pdCli, err = pd.NewClient(caller.Component("tidb-tikv-driver"), etcdAddrs, pd.SecurityOption{
|
||||
CAPath: d.security.ClusterSSLCA,
|
||||
CertPath: d.security.ClusterSSLCert,
|
||||
KeyPath: d.security.ClusterSSLKey,
|
||||
},
|
||||
var apiCtx = pd.NewAPIContextV1()
|
||||
if len(keyspaceName) > 0 {
|
||||
apiCtx = pd.NewAPIContextV2(keyspaceName)
|
||||
}
|
||||
|
||||
pdCli, err = pd.NewClientWithAPIContext(context.Background(), apiCtx, "tidb-tikv-driver", etcdAddrs,
|
||||
pd.SecurityOption{
|
||||
CAPath: d.security.ClusterSSLCA,
|
||||
CertPath: d.security.ClusterSSLCert,
|
||||
KeyPath: d.security.ClusterSSLKey,
|
||||
},
|
||||
opt.WithGRPCDialOptions(
|
||||
// keep the same with etcd, see
|
||||
// https://github.com/etcd-io/etcd/blob/5704c6148d798ea444db26a966394406d8c10526/server/etcdserver/api/v3rpc/grpc.go#L34
|
||||
@ -178,7 +179,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv
|
||||
pdCli = util.InterceptedPDClient{Client: pdCli}
|
||||
|
||||
// FIXME: uuid will be a very long and ugly string, simplify it.
|
||||
uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO()))
|
||||
clusterID := pdCli.GetClusterID(context.TODO())
|
||||
uuid := fmt.Sprintf("tikv-%v/%s", clusterID, keyspaceName)
|
||||
if store, ok := mc.cache[uuid]; ok {
|
||||
pdCli.Close()
|
||||
return store, nil
|
||||
@ -241,6 +243,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv
|
||||
enableGC: !disableGC,
|
||||
coprStore: coprStore,
|
||||
codec: codec,
|
||||
clusterID: clusterID,
|
||||
}
|
||||
|
||||
mc.cache[uuid] = store
|
||||
@ -257,6 +260,7 @@ type tikvStore struct {
|
||||
coprStore *copr.Store
|
||||
codec tikv.Codec
|
||||
opts sync.Map
|
||||
clusterID uint64
|
||||
}
|
||||
|
||||
// GetOption wraps around sync.Map.
|
||||
@ -427,6 +431,10 @@ func (s *tikvStore) GetCodec() tikv.Codec {
|
||||
return s.codec
|
||||
}
|
||||
|
||||
func (s *tikvStore) GetClusterID() uint64 {
|
||||
return s.clusterID
|
||||
}
|
||||
|
||||
// injectTraceClient injects trace info to the tikv request
|
||||
type injectTraceClient struct {
|
||||
tikv.Client
|
||||
|
||||
@ -78,6 +78,7 @@ type Storage interface {
|
||||
GetPDHTTPClient() pd.Client
|
||||
GetOption(any) (any, bool)
|
||||
SetOption(any, any)
|
||||
GetClusterID() uint64
|
||||
}
|
||||
|
||||
// Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table.
|
||||
|
||||
@ -168,3 +168,7 @@ type MockLockWaitSetter interface {
|
||||
func (s *mockStorage) SetMockLockWaits(lockWaits []*deadlockpb.WaitForEntry) {
|
||||
s.LockWaits = lockWaits
|
||||
}
|
||||
|
||||
func (s *mockStorage) GetClusterID() uint64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"sync"
|
||||
@ -22,30 +23,40 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/kvproto/pkg/pdpb"
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/keyspace"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/parser/terror"
|
||||
"github.com/pingcap/tidb/pkg/util"
|
||||
"github.com/pingcap/tidb/pkg/util/logutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var stores = make(map[config.StoreType]kv.Driver)
|
||||
var storesLock sync.RWMutex
|
||||
var (
|
||||
storeDrivers = make(map[config.StoreType]kv.Driver)
|
||||
storeDriverLock sync.RWMutex
|
||||
|
||||
// systemStore is the kv.Storage for the SYSTEM keyspace.
|
||||
// which is only initialized and used in nextgen kernel.
|
||||
// for description of SYSTEM keyspace, see keyspace.System.
|
||||
systemStore kv.Storage
|
||||
)
|
||||
|
||||
// Register registers a kv storage with unique name and its associated Driver.
|
||||
// TODO: remove this function and use driver directly, TiDB is not a SDK.
|
||||
func Register(tp config.StoreType, driver kv.Driver) error {
|
||||
storesLock.Lock()
|
||||
defer storesLock.Unlock()
|
||||
storeDriverLock.Lock()
|
||||
defer storeDriverLock.Unlock()
|
||||
|
||||
if !tp.Valid() {
|
||||
return errors.Errorf("invalid storage type %s", tp)
|
||||
}
|
||||
|
||||
if _, ok := stores[tp]; ok {
|
||||
if _, ok := storeDrivers[tp]; ok {
|
||||
return errors.Errorf("%s is already registered", tp)
|
||||
}
|
||||
|
||||
stores[tp] = driver
|
||||
storeDrivers[tp] = driver
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -91,9 +102,9 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) {
|
||||
}
|
||||
|
||||
func loadDriver(tp config.StoreType) (kv.Driver, bool) {
|
||||
storesLock.RLock()
|
||||
defer storesLock.RUnlock()
|
||||
d, ok := stores[tp]
|
||||
storeDriverLock.RLock()
|
||||
defer storeDriverLock.RUnlock()
|
||||
d, ok := storeDrivers[tp]
|
||||
return d, ok
|
||||
}
|
||||
|
||||
@ -125,3 +136,35 @@ func IsKeyspaceNotExistError(err error) bool {
|
||||
}
|
||||
return strings.Contains(err.Error(), pdpb.ErrorType_ENTRY_NOT_FOUND.String())
|
||||
}
|
||||
|
||||
// MustInitStorage initializes the kv.Storage for this instance.
|
||||
func MustInitStorage(keyspaceName string) kv.Storage {
|
||||
defaultStore := mustInitStorage(keyspaceName)
|
||||
if kerneltype.IsNextGen() {
|
||||
if keyspace.IsRunningOnUser() {
|
||||
systemStore = mustInitStorage(keyspace.System)
|
||||
} else {
|
||||
systemStore = defaultStore
|
||||
}
|
||||
}
|
||||
return defaultStore
|
||||
}
|
||||
|
||||
// GetSystemStorage returns the kv.Storage for the SYSTEM keyspace.
|
||||
func GetSystemStorage() kv.Storage {
|
||||
return systemStore
|
||||
}
|
||||
|
||||
func mustInitStorage(keyspaceName string) kv.Storage {
|
||||
cfg := config.GetGlobalConfig()
|
||||
var fullPath string
|
||||
if keyspaceName == "" {
|
||||
fullPath = fmt.Sprintf("%s://%s", cfg.Store, cfg.Path)
|
||||
} else {
|
||||
fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", cfg.Store, cfg.Path, keyspaceName)
|
||||
}
|
||||
var err error
|
||||
storage, err := New(fullPath)
|
||||
terror.MustNil(err)
|
||||
return storage
|
||||
}
|
||||
|
||||
@ -24,6 +24,8 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/tidb/pkg/config"
|
||||
"github.com/pingcap/tidb/pkg/config/kerneltype"
|
||||
"github.com/pingcap/tidb/pkg/keyspace"
|
||||
"github.com/pingcap/tidb/pkg/kv"
|
||||
"github.com/pingcap/tidb/pkg/store/mockstore"
|
||||
"github.com/stretchr/testify/require"
|
||||
@ -865,3 +867,27 @@ func TestSetAssertion(t *testing.T) {
|
||||
|
||||
require.NoError(t, txn.Rollback())
|
||||
}
|
||||
|
||||
func TestInitStorage(t *testing.T) {
|
||||
require.NoError(t, Register(config.StoreTypeUniStore, mockstore.EmbedUnistoreDriver{}))
|
||||
if kerneltype.IsClassic() {
|
||||
storage := MustInitStorage("")
|
||||
defer storage.Close()
|
||||
require.Nil(t, GetSystemStorage())
|
||||
} else {
|
||||
bak := *config.GetGlobalConfig()
|
||||
config.GetGlobalConfig().Path = t.TempDir()
|
||||
config.GetGlobalConfig().KeyspaceName = keyspace.System
|
||||
t.Cleanup(func() {
|
||||
config.StoreGlobalConfig(&bak)
|
||||
})
|
||||
|
||||
storage := MustInitStorage(keyspace.System)
|
||||
require.NotNil(t, GetSystemStorage())
|
||||
defer storage.Close()
|
||||
require.Same(t, storage, GetSystemStorage())
|
||||
|
||||
// for user keyspace, we need to init 2 store with different PATH, we cannot
|
||||
// do it for uni-store, so skip it.
|
||||
}
|
||||
}
|
||||
|
||||
@ -93,3 +93,8 @@ func (*Store) GetOption(_ any) (any, bool) {
|
||||
|
||||
// SetOption implements kv.Storage interface.
|
||||
func (*Store) SetOption(_, _ any) {}
|
||||
|
||||
// GetClusterID implements kv.Storage interface.
|
||||
func (*Store) GetClusterID() uint64 {
|
||||
return 1
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user