diff --git a/br/pkg/gluetikv/glue.go b/br/pkg/gluetikv/glue.go index 2fd990e92d..3ac159aef8 100644 --- a/br/pkg/gluetikv/glue.go +++ b/br/pkg/gluetikv/glue.go @@ -46,7 +46,7 @@ func (Glue) Open(path string, option pd.SecurityOption) (kv.Storage, error) { conf.Security.ClusterSSLKey = option.KeyPath config.StoreGlobalConfig(conf) } - return driver.TiKVDriver{}.Open(path) + return (&driver.TiKVDriver{}).Open(path) } // OwnsStorage implements glue.Glue. diff --git a/cmd/benchdb/main.go b/cmd/benchdb/main.go index 511b385db4..cb62db8089 100644 --- a/cmd/benchdb/main.go +++ b/cmd/benchdb/main.go @@ -60,7 +60,7 @@ func main() { flag.PrintDefaults() err := logutil.InitLogger(logutil.NewLogConfig(*logLevel, logutil.DefaultLogFormat, "", "", logutil.EmptyFileLogConfig, false)) terror.MustNil(err) - err = store.Register(config.StoreTypeTiKV, driver.TiKVDriver{}) + err = store.Register(config.StoreTypeTiKV, &driver.TiKVDriver{}) terror.MustNil(err) ut := newBenchDB() works := strings.Split(*runJobs, "|") diff --git a/cmd/ddltest/ddl_test.go b/cmd/ddltest/ddl_test.go index b362a9f3c6..41e031b3fe 100644 --- a/cmd/ddltest/ddl_test.go +++ b/cmd/ddltest/ddl_test.go @@ -1163,5 +1163,5 @@ func addEnvPath(newPath string) { } func init() { - _ = store.Register(config.StoreTypeTiKV, tidbdriver.TiKVDriver{}) + _ = store.Register(config.StoreTypeTiKV, &tidbdriver.TiKVDriver{}) } diff --git a/cmd/tidb-server/main.go b/cmd/tidb-server/main.go index 01a94bd29b..148ec85aea 100644 --- a/cmd/tidb-server/main.go +++ b/cmd/tidb-server/main.go @@ -407,7 +407,7 @@ func setCPUAffinity() { } func registerStores() { - err := kvstore.Register(config.StoreTypeTiKV, driver.TiKVDriver{}) + err := kvstore.Register(config.StoreTypeTiKV, &driver.TiKVDriver{}) terror.MustNil(err) err = kvstore.Register(config.StoreTypeMockTiKV, mockstore.MockTiKVDriver{}) terror.MustNil(err) @@ -416,16 +416,7 @@ func registerStores() { } func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) { - cfg := config.GetGlobalConfig() - var fullPath string - if keyspaceName == "" { - fullPath = fmt.Sprintf("%s://%s", cfg.Store, cfg.Path) - } else { - fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", cfg.Store, cfg.Path, keyspaceName) - } - var err error - storage, err := kvstore.New(fullPath) - terror.MustNil(err) + storage := kvstore.MustInitStorage(keyspaceName) if tikvStore, ok := storage.(kv.StorageWithPD); ok { pdhttpCli := tikvStore.GetPDHTTPClient() // unistore also implements kv.StorageWithPD, but it does not have PD client. @@ -441,7 +432,7 @@ func createStoreDDLOwnerMgrAndDomain(keyspaceName string) (kv.Storage, *domain.D copr.GlobalMPPFailedStoreProber.Run() mppcoordmanager.InstanceMPPCoordinatorManager.Run() // Bootstrap a session to load information schema. - err = ddl.StartOwnerManager(context.Background(), storage) + err := ddl.StartOwnerManager(context.Background(), storage) terror.MustNil(err) dom, err := session.BootstrapSession(storage) terror.MustNil(err) @@ -934,6 +925,10 @@ func closeDDLOwnerMgrDomainAndStorage(storage kv.Storage, dom *domain.Domain) { mppcoordmanager.InstanceMPPCoordinatorManager.Stop() err := storage.Close() terror.Log(errors.Trace(err)) + if kerneltype.IsNextGen() && keyspace.IsRunningOnUser() { + err = kvstore.GetSystemStorage().Close() + terror.Log(errors.Annotate(err, "close system storage")) + } } // The amount of time we wait for the ongoing txt to finished. diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 5ca274b6f9..bf509d2681 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -1351,7 +1351,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) { u = strings.TrimPrefix(u, "https://") urlsWithoutScheme = append(urlsWithoutScheme, u) } - kvStore, err = driver.TiKVDriver{}.OpenWithOptions( + kvStore, err = (&driver.TiKVDriver{}).OpenWithOptions( fmt.Sprintf( "tikv://%s?disableGC=true&keyspaceName=%s", strings.Join(urlsWithoutScheme, ","), rc.keyspaceName, diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index ee8fd309c4..5b4e1b40c3 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -134,8 +134,8 @@ func NewMockDomain() *Domain { return do } -// Domain represents a storage space. Different domains can use the same database name. -// Multiple domains can be used in parallel without synchronization. +// Domain manages life cycle of nearly all other components related to SQL execution +// of a TiDB instance, only one domain can exist at a time. type Domain struct { store kv.Storage infoCache *infoschema.InfoCache diff --git a/pkg/keyspace/BUILD.bazel b/pkg/keyspace/BUILD.bazel index 7fdfdf0bb7..6e8ecdf585 100644 --- a/pkg/keyspace/BUILD.bazel +++ b/pkg/keyspace/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "keyspace", - srcs = ["keyspace.go"], + srcs = [ + "doc.go", + "keyspace.go", + ], importpath = "github.com/pingcap/tidb/pkg/keyspace", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/keyspace/doc.go b/pkg/keyspace/doc.go new file mode 100644 index 0000000000..e9292a976f --- /dev/null +++ b/pkg/keyspace/doc.go @@ -0,0 +1,35 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package keyspace provides utilities for keyspace for nextgen TiDB. +// +// Keyspace are used to isolate data and operations, allowing for multi-tenancy +// in next generation TiDB. Each keyspace represents a logical cluster on top of +// the underlying physical cluster. +// +// There is a special keyspace named "SYSTEM", which is reserved for system-level +// services and data, currently, only the DXF service uses this keyspace. +// As user keyspace depends on SYSTEM keyspace, we need to make sure SYSTEM +// keyspace exist before user keyspace start serving any user traffic. So for the +// deployment of nextgen cluster, we need to: +// - Deploy PD/TiKV and other components, wait them to be ready to serve TiDB access. +// - Deploy SYSTEM keyspace, wait it fully bootstrapped. +// - Deploy other user keyspace, they can be deployed concurrently. +// +// During upgrade, we also need to follow above order, i.e. We need to upgrade +// the SYSTEM keyspace first, then user keyspace. +// +// Note: serverless also use keyspace, and have the special NULL and DEFAULT +// keyspace, while nextgen hasn't. +package keyspace diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 9ebd7b3456..33c0920b13 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -27,6 +27,9 @@ import ( ) const ( + // System is the keyspace name for SYSTEM keyspace. + // see doc.go for more detail. + System = "SYSTEM" // tidbKeyspaceEtcdPathPrefix is the keyspace prefix for etcd namespace tidbKeyspaceEtcdPathPrefix = "/keyspaces/tidb/" ) @@ -87,3 +90,8 @@ func WrapZapcoreWithKeyspace() zap.Option { return core }) } + +// IsRunningOnUser checks if keyspace of current instance is a user keyspace. +func IsRunningOnUser() bool { + return config.GetGlobalKeyspaceName() != System +} diff --git a/pkg/kv/interface_mock_test.go b/pkg/kv/interface_mock_test.go index 5b56ee8361..31e46350c8 100644 --- a/pkg/kv/interface_mock_test.go +++ b/pkg/kv/interface_mock_test.go @@ -279,6 +279,10 @@ func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 { return 0 } +func (s *mockStorage) GetClusterID() uint64 { + return 1 +} + // newMockStorage creates a new mockStorage. func newMockStorage() Storage { return &mockStorage{} diff --git a/pkg/kv/kv.go b/pkg/kv/kv.go index 411734d2bd..73f1f6d177 100644 --- a/pkg/kv/kv.go +++ b/pkg/kv/kv.go @@ -728,6 +728,9 @@ type Storage interface { SetOption(k any, v any) // GetOption is a thin wrapper around sync.Map. GetOption(k any) (any, bool) + // GetClusterID returns the physical cluster ID of the storage. + // for nextgen, all keyspace in the storage share the same cluster ID. + GetClusterID() uint64 } // EtcdBackend is used for judging a storage is a real TiKV. diff --git a/pkg/session/BUILD.bazel b/pkg/session/BUILD.bazel index 5b93bcfe08..d6e1805a3d 100644 --- a/pkg/session/BUILD.bazel +++ b/pkg/session/BUILD.bazel @@ -21,6 +21,7 @@ go_library( deps = [ "//pkg/bindinfo", "//pkg/config", + "//pkg/config/kerneltype", "//pkg/ddl", "//pkg/ddl/placement", "//pkg/ddl/schematracker", @@ -43,6 +44,7 @@ go_library( "//pkg/extension/extensionimpl", "//pkg/infoschema", "//pkg/infoschema/context", + "//pkg/keyspace", "//pkg/kv", "//pkg/meta", "//pkg/meta/metabuild", diff --git a/pkg/session/session.go b/pkg/session/session.go index 5f180adeb7..e861d62ed7 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/placement" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" @@ -61,6 +62,7 @@ import ( "github.com/pingcap/tidb/pkg/extension/extensionimpl" "github.com/pingcap/tidb/pkg/infoschema" infoschemactx "github.com/pingcap/tidb/pkg/infoschema/context" + "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/metabuild" @@ -96,6 +98,7 @@ import ( "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" + kvstore "github.com/pingcap/tidb/pkg/store" storeerr "github.com/pingcap/tidb/pkg/store/driver/error" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" @@ -3484,6 +3487,17 @@ func BootstrapSession4DistExecution(store kv.Storage) (*domain.Domain, error) { // such as system time zone // - start domain and other routines. func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsImpl func(store kv.Storage, cnt int) ([]*session, error)) (*domain.Domain, error) { + ver := getStoreBootstrapVersionWithCache(store) + if kerneltype.IsNextGen() && keyspace.IsRunningOnUser() { + systemKSVer := mustGetStoreBootstrapVersion(kvstore.GetSystemStorage()) + if systemKSVer == notBootstrapped { + logutil.BgLogger().Fatal("SYSTEM keyspace is not bootstrapped") + } else if ver > systemKSVer { + logutil.BgLogger().Fatal("bootstrap version of user keyspace must be smaller or equal to that of SYSTEM keyspace", + zap.Int64("user", ver), zap.Int64("system", systemKSVer)) + } + } + ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBootstrap) cfg := config.GetGlobalConfig() if len(cfg.Instance.PluginLoad) > 0 { @@ -3515,7 +3529,6 @@ func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsI if err != nil { return nil, err } - ver := getStoreBootstrapVersionWithCache(store) if ver < currentBootstrapVersion { runInBootstrapSession(store, ver) } else { @@ -3798,20 +3811,20 @@ func runInBootstrapSession(store kv.Storage, ver int64) { } func createSessions(store kv.Storage, cnt int) ([]*session, error) { - return createSessionsImpl(store, cnt, createSession) + return createSessionsImpl(store, cnt) } func createSessions4DistExecution(store kv.Storage, cnt int) ([]*session, error) { domap.Delete(store) - return createSessionsImpl(store, cnt, createSession4DistExecution) + return createSessionsImpl(store, cnt) } -func createSessionsImpl(store kv.Storage, cnt int, createSessionImpl func(kv.Storage) (*session, error)) ([]*session, error) { +func createSessionsImpl(store kv.Storage, cnt int) ([]*session, error) { // Then we can create new dom ses := make([]*session, cnt) for i := range cnt { - se, err := createSessionImpl(store) + se, err := createSession(store) if err != nil { return nil, err } @@ -3829,10 +3842,6 @@ func createSession(store kv.Storage) (*session, error) { return createSessionWithOpt(store, nil) } -func createSession4DistExecution(store kv.Storage) (*session, error) { - return createSessionWithOpt(store, nil) -} - func createSessionWithOpt(store kv.Storage, opt *Opt) (*session, error) { dom, err := domap.Get(store) if err != nil { diff --git a/pkg/session/upgrade.go b/pkg/session/upgrade.go index 85d198a93d..ea9afc3d33 100644 --- a/pkg/session/upgrade.go +++ b/pkg/session/upgrade.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" - "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/kv" @@ -1912,10 +1911,7 @@ func upgradeToVer241(s sessiontypes.Session, _ int64) { // writeClusterID writes cluster id into mysql.tidb func writeClusterID(s sessiontypes.Session) { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(internalSQLTimeout)*time.Second) - defer cancel() - - clusterID := s.GetDomain().(*domain.Domain).GetPDClient().GetClusterID(ctx) + clusterID := s.GetStore().GetClusterID() mustExecute(s, `INSERT HIGH_PRIORITY INTO %n.%n VALUES (%?, %?, "TiDB Cluster ID.") ON DUPLICATE KEY UPDATE VARIABLE_VALUE= %?`, mysql.SystemDB, diff --git a/pkg/store/BUILD.bazel b/pkg/store/BUILD.bazel index a1df57ab5c..59392afc5c 100644 --- a/pkg/store/BUILD.bazel +++ b/pkg/store/BUILD.bazel @@ -10,8 +10,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/config", + "//pkg/config/kerneltype", "//pkg/keyspace", "//pkg/kv", + "//pkg/parser/terror", "//pkg/util", "//pkg/util/etcd", "//pkg/util/logutil", @@ -36,10 +38,12 @@ go_test( ], embed = [":store"], flaky = True, - shard_count = 23, + shard_count = 24, deps = [ "//pkg/config", + "//pkg/config/kerneltype", "//pkg/domain", + "//pkg/keyspace", "//pkg/kv", "//pkg/store/mockstore", "//pkg/store/mockstore/unistore", diff --git a/pkg/store/driver/BUILD.bazel b/pkg/store/driver/BUILD.bazel index 1a9a3391d3..3ec4eade7f 100644 --- a/pkg/store/driver/BUILD.bazel +++ b/pkg/store/driver/BUILD.bazel @@ -25,7 +25,6 @@ go_library( "@com_github_tikv_pd_client//:client", "@com_github_tikv_pd_client//http", "@com_github_tikv_pd_client//opt", - "@com_github_tikv_pd_client//pkg/caller", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//keepalive", "@org_uber_go_zap//:zap", diff --git a/pkg/store/driver/tikv_driver.go b/pkg/store/driver/tikv_driver.go index 63c9bd8d7a..abab9b37d1 100644 --- a/pkg/store/driver/tikv_driver.go +++ b/pkg/store/driver/tikv_driver.go @@ -43,7 +43,6 @@ import ( pd "github.com/tikv/pd/client" pdhttp "github.com/tikv/pd/client/http" "github.com/tikv/pd/client/opt" - "github.com/tikv/pd/client/pkg/caller" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -95,10 +94,6 @@ func WithPDClientConfig(client config.PDClient) Option { } } -func getKVStore(path string, tls config.Security) (kv.Storage, error) { - return TiKVDriver{}.OpenWithOptions(path, WithSecurity(tls)) -} - // TiKVDriver implements engine TiKV. type TiKVDriver struct { pdConfig config.PDClient @@ -109,7 +104,7 @@ type TiKVDriver struct { // Open opens or creates an TiKV storage with given path using global config. // Path example: tikv://etcd-node1:port,etcd-node2:port?cluster=1&disableGC=false -func (d TiKVDriver) Open(path string) (kv.Storage, error) { +func (d *TiKVDriver) Open(path string) (kv.Storage, error) { return d.OpenWithOptions(path) } @@ -126,7 +121,7 @@ func (d *TiKVDriver) setDefaultAndOptions(options ...Option) { // OpenWithOptions is used by other program that use tidb as a library, to avoid modifying GlobalConfig // unspecified options will be set to global config -func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv.Storage, err error) { +func (d *TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv.Storage, err error) { mc.Lock() defer mc.Unlock() d.setDefaultAndOptions(options...) @@ -156,11 +151,17 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv } }() - pdCli, err = pd.NewClient(caller.Component("tidb-tikv-driver"), etcdAddrs, pd.SecurityOption{ - CAPath: d.security.ClusterSSLCA, - CertPath: d.security.ClusterSSLCert, - KeyPath: d.security.ClusterSSLKey, - }, + var apiCtx = pd.NewAPIContextV1() + if len(keyspaceName) > 0 { + apiCtx = pd.NewAPIContextV2(keyspaceName) + } + + pdCli, err = pd.NewClientWithAPIContext(context.Background(), apiCtx, "tidb-tikv-driver", etcdAddrs, + pd.SecurityOption{ + CAPath: d.security.ClusterSSLCA, + CertPath: d.security.ClusterSSLCert, + KeyPath: d.security.ClusterSSLKey, + }, opt.WithGRPCDialOptions( // keep the same with etcd, see // https://github.com/etcd-io/etcd/blob/5704c6148d798ea444db26a966394406d8c10526/server/etcdserver/api/v3rpc/grpc.go#L34 @@ -178,7 +179,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv pdCli = util.InterceptedPDClient{Client: pdCli} // FIXME: uuid will be a very long and ugly string, simplify it. - uuid := fmt.Sprintf("tikv-%v", pdCli.GetClusterID(context.TODO())) + clusterID := pdCli.GetClusterID(context.TODO()) + uuid := fmt.Sprintf("tikv-%v/%s", clusterID, keyspaceName) if store, ok := mc.cache[uuid]; ok { pdCli.Close() return store, nil @@ -241,6 +243,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv enableGC: !disableGC, coprStore: coprStore, codec: codec, + clusterID: clusterID, } mc.cache[uuid] = store @@ -257,6 +260,7 @@ type tikvStore struct { coprStore *copr.Store codec tikv.Codec opts sync.Map + clusterID uint64 } // GetOption wraps around sync.Map. @@ -427,6 +431,10 @@ func (s *tikvStore) GetCodec() tikv.Codec { return s.codec } +func (s *tikvStore) GetClusterID() uint64 { + return s.clusterID +} + // injectTraceClient injects trace info to the tikv request type injectTraceClient struct { tikv.Client diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index 95a3111ade..28c1431b65 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -78,6 +78,7 @@ type Storage interface { GetPDHTTPClient() pd.Client GetOption(any) (any, bool) SetOption(any, any) + GetClusterID() uint64 } // Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. diff --git a/pkg/store/mockstore/mockstorage/storage.go b/pkg/store/mockstore/mockstorage/storage.go index ca70712d8e..e56bc3719d 100644 --- a/pkg/store/mockstore/mockstorage/storage.go +++ b/pkg/store/mockstore/mockstorage/storage.go @@ -168,3 +168,7 @@ type MockLockWaitSetter interface { func (s *mockStorage) SetMockLockWaits(lockWaits []*deadlockpb.WaitForEntry) { s.LockWaits = lockWaits } + +func (s *mockStorage) GetClusterID() uint64 { + return 1 +} diff --git a/pkg/store/store.go b/pkg/store/store.go index 84722c48aa..e83fbd94a9 100644 --- a/pkg/store/store.go +++ b/pkg/store/store.go @@ -15,6 +15,7 @@ package store import ( + "fmt" "net/url" "strings" "sync" @@ -22,30 +23,40 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) -var stores = make(map[config.StoreType]kv.Driver) -var storesLock sync.RWMutex +var ( + storeDrivers = make(map[config.StoreType]kv.Driver) + storeDriverLock sync.RWMutex + + // systemStore is the kv.Storage for the SYSTEM keyspace. + // which is only initialized and used in nextgen kernel. + // for description of SYSTEM keyspace, see keyspace.System. + systemStore kv.Storage +) // Register registers a kv storage with unique name and its associated Driver. // TODO: remove this function and use driver directly, TiDB is not a SDK. func Register(tp config.StoreType, driver kv.Driver) error { - storesLock.Lock() - defer storesLock.Unlock() + storeDriverLock.Lock() + defer storeDriverLock.Unlock() if !tp.Valid() { return errors.Errorf("invalid storage type %s", tp) } - if _, ok := stores[tp]; ok { + if _, ok := storeDrivers[tp]; ok { return errors.Errorf("%s is already registered", tp) } - stores[tp] = driver + storeDrivers[tp] = driver return nil } @@ -91,9 +102,9 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) { } func loadDriver(tp config.StoreType) (kv.Driver, bool) { - storesLock.RLock() - defer storesLock.RUnlock() - d, ok := stores[tp] + storeDriverLock.RLock() + defer storeDriverLock.RUnlock() + d, ok := storeDrivers[tp] return d, ok } @@ -125,3 +136,35 @@ func IsKeyspaceNotExistError(err error) bool { } return strings.Contains(err.Error(), pdpb.ErrorType_ENTRY_NOT_FOUND.String()) } + +// MustInitStorage initializes the kv.Storage for this instance. +func MustInitStorage(keyspaceName string) kv.Storage { + defaultStore := mustInitStorage(keyspaceName) + if kerneltype.IsNextGen() { + if keyspace.IsRunningOnUser() { + systemStore = mustInitStorage(keyspace.System) + } else { + systemStore = defaultStore + } + } + return defaultStore +} + +// GetSystemStorage returns the kv.Storage for the SYSTEM keyspace. +func GetSystemStorage() kv.Storage { + return systemStore +} + +func mustInitStorage(keyspaceName string) kv.Storage { + cfg := config.GetGlobalConfig() + var fullPath string + if keyspaceName == "" { + fullPath = fmt.Sprintf("%s://%s", cfg.Store, cfg.Path) + } else { + fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", cfg.Store, cfg.Path, keyspaceName) + } + var err error + storage, err := New(fullPath) + terror.MustNil(err) + return storage +} diff --git a/pkg/store/store_test.go b/pkg/store/store_test.go index 6453703efc..0ebf7a8947 100644 --- a/pkg/store/store_test.go +++ b/pkg/store/store_test.go @@ -24,6 +24,8 @@ import ( "time" "github.com/pingcap/tidb/pkg/config" + "github.com/pingcap/tidb/pkg/config/kerneltype" + "github.com/pingcap/tidb/pkg/keyspace" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/stretchr/testify/require" @@ -865,3 +867,27 @@ func TestSetAssertion(t *testing.T) { require.NoError(t, txn.Rollback()) } + +func TestInitStorage(t *testing.T) { + require.NoError(t, Register(config.StoreTypeUniStore, mockstore.EmbedUnistoreDriver{})) + if kerneltype.IsClassic() { + storage := MustInitStorage("") + defer storage.Close() + require.Nil(t, GetSystemStorage()) + } else { + bak := *config.GetGlobalConfig() + config.GetGlobalConfig().Path = t.TempDir() + config.GetGlobalConfig().KeyspaceName = keyspace.System + t.Cleanup(func() { + config.StoreGlobalConfig(&bak) + }) + + storage := MustInitStorage(keyspace.System) + require.NotNil(t, GetSystemStorage()) + defer storage.Close() + require.Same(t, storage, GetSystemStorage()) + + // for user keyspace, we need to init 2 store with different PATH, we cannot + // do it for uni-store, so skip it. + } +} diff --git a/pkg/util/mock/store.go b/pkg/util/mock/store.go index 6253984ec8..2c4bb425fb 100644 --- a/pkg/util/mock/store.go +++ b/pkg/util/mock/store.go @@ -93,3 +93,8 @@ func (*Store) GetOption(_ any) (any, bool) { // SetOption implements kv.Storage interface. func (*Store) SetOption(_, _ any) {} + +// GetClusterID implements kv.Storage interface. +func (*Store) GetClusterID() uint64 { + return 1 +}