From c85eb69217aa6fbbf15edefc6bef3fc8510bbcf1 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Thu, 19 Aug 2021 15:34:01 +0800 Subject: [PATCH] lightning: precheck cluster region and available space (#27082) --- br/pkg/lightning/restore/check_info.go | 209 +++++++++++++++--- br/pkg/lightning/restore/check_template.go | 7 +- br/pkg/lightning/restore/meta_manager.go | 72 ++++-- br/pkg/lightning/restore/meta_manager_test.go | 61 +++++ br/pkg/lightning/restore/restore.go | 16 +- br/pkg/lightning/restore/restore_test.go | 133 ++++++++++- br/pkg/lightning/restore/table_restore.go | 2 +- 7 files changed, 441 insertions(+), 59 deletions(-) diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index 360b62b4c5..a7507cf6a9 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -21,6 +21,8 @@ import ( "io" "path/filepath" "reflect" + "sort" + "strconv" "strings" "github.com/docker/go-units" @@ -38,29 +40,27 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/table/tables" - "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/api" pdconfig "github.com/tikv/pd/server/config" "go.uber.org/zap" ) const ( - pdWriteFlow = "/pd/api/v1/regions/writeflow" - pdReadFlow = "/pd/api/v1/regions/readflow" - - // OnlineBytesLimitation/OnlineKeysLimitation is the statistics of - // Bytes/Keys used per region from pdWriteFlow/pdReadFlow - // this determines whether the cluster has some region that have other loads - // and might influence the import task in the future. - OnlineBytesLimitation = 10 * units.MiB - OnlineKeysLimitation = 5000 - - pdStores = "/pd/api/v1/stores" - pdReplicate = "/pd/api/v1/config/replicate" + pdStores = "/pd/api/v1/stores" + pdReplicate = "/pd/api/v1/config/replicate" + pdEmptyRegions = "/pd/api/v1/regions/check/empty-region" defaultCSVSize = 10 * units.GiB maxSampleDataSize = 10 * 1024 * 1024 maxSampleRowCount = 10 * 1024 + + warnEmptyRegionCntPerStore = 500 + errorEmptyRegionCntPerStore = 1000 + warnRegionCntMinMaxRatio = 0.75 + errorRegionCntMinMaxRatio = 0.5 + + // We only check RegionCntMaxMinRatio when the maximum region count of all stores is larger than this threshold. + checkRegionCntRatioThreshold = 1000 ) func (rc *Controller) isSourceInLocal() bool { @@ -76,6 +76,18 @@ func (rc *Controller) getReplicaCount(ctx context.Context) (uint64, error) { return result.MaxReplicas, nil } +func (rc *Controller) getClusterAvail(ctx context.Context) (uint64, error) { + result := &api.StoresInfo{} + if err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil { + return 0, errors.Trace(err) + } + clusterAvail := uint64(0) + for _, store := range result.Stores { + clusterAvail += uint64(store.Status.Available) + } + return clusterAvail, nil +} + // ClusterResource check cluster has enough resource to import data. this test can by skipped. func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error { passed := true @@ -84,35 +96,62 @@ func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) er rc.checkTemplate.Collect(Critical, passed, message) }() - result := &api.StoresInfo{} - err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result) - if err != nil { - return errors.Trace(err) - } - totalCapacity := typeutil.ByteSize(0) - for _, store := range result.Stores { - totalCapacity += store.Status.Capacity - } - clusterSource := localSource - if rc.taskMgr != nil { - clusterSource, err = rc.taskMgr.CheckClusterSource(ctx) + var ( + clusterAvail uint64 + clusterSource uint64 + ) + if rc.taskMgr == nil { + var err error + clusterAvail, err = rc.getClusterAvail(ctx) if err != nil { return errors.Trace(err) } + clusterSource = uint64(localSource) + } else { + if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + clusterAvail = 0 + clusterSource = 0 + restoreStarted := false + for _, task := range tasks { + if task.status > taskMetaStatusInitial { + restoreStarted = true + } + clusterSource += task.sourceBytes + if task.clusterAvail > 0 { + clusterAvail = task.clusterAvail + } + } + if restoreStarted || clusterAvail > 0 { + return nil, nil + } + + var err error + clusterAvail, err = rc.getClusterAvail(ctx) + if err != nil { + return nil, errors.Trace(err) + } + newTasks := append([]taskMeta(nil), tasks...) + for i := 0; i < len(newTasks); i++ { + newTasks[i].clusterAvail = clusterAvail + } + return newTasks, nil + }); err != nil { + return errors.Trace(err) + } } replicaCount, err := rc.getReplicaCount(ctx) if err != nil { return errors.Trace(err) } - estimateSize := uint64(clusterSource) * replicaCount - if typeutil.ByteSize(estimateSize) > totalCapacity { + estimateSize := clusterSource * replicaCount + if estimateSize > clusterAvail { passed = false - message = fmt.Sprintf("Cluster doesn't have enough space, capacity is %s, but we need %s", - units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize))) + message = fmt.Sprintf("Cluster doesn't have enough space, available is %s, but we need %s", + units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(estimateSize))) } else { - message = fmt.Sprintf("Cluster capacity is rich, capacity is %s, we need %s", - units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize))) + message = fmt.Sprintf("Cluster available is rich, available is %s, we need %s", + units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(estimateSize))) } return nil } @@ -139,6 +178,114 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error { return nil } +func (rc *Controller) checkEmptyRegion(ctx context.Context) error { + passed := true + message := "Cluster doesn't have too many empty regions" + defer func() { + rc.checkTemplate.Collect(Critical, passed, message) + }() + + var result api.RegionsInfo + if err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil { + return errors.Trace(err) + } + regions := make(map[uint64]int) + for _, region := range result.Regions { + for _, peer := range region.Peers { + regions[peer.StoreId]++ + } + } + + var ( + errStores []string + warnStores []string + ) + for storeID, regionCnt := range regions { + if regionCnt > errorEmptyRegionCntPerStore { + errStores = append(errStores, strconv.Itoa(int(storeID))) + } else if regionCnt > warnEmptyRegionCntPerStore { + warnStores = append(warnStores, strconv.Itoa(int(storeID))) + } + } + + var messages []string + if len(errStores) > 0 { + passed = false + messages = append(messages, fmt.Sprintf("TiKV stores (%s) contains more than %v empty regions respectively, "+ + "which will greatly affect the import speed and success rate", strings.Join(errStores, ", "), errorEmptyRegionCntPerStore)) + } + if len(warnStores) > 0 { + messages = append(messages, fmt.Sprintf("TiKV stores (%s) contains more than %v empty regions respectively, "+ + "which will affect the import speed and success rate", strings.Join(warnStores, ", "), warnEmptyRegionCntPerStore)) + } + if len(messages) > 0 { + message = strings.Join(messages, "\n") + } + return nil +} + +// checkRegionDistribution checks if regions distribution is unbalanced. +func (rc *Controller) checkRegionDistribution(ctx context.Context) error { + passed := true + message := "Cluster region distribution is balanced" + defer func() { + rc.checkTemplate.Collect(Critical, passed, message) + }() + + result := &api.StoresInfo{} + err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result) + if err != nil { + return errors.Trace(err) + } + if len(result.Stores) <= 1 { + return nil + } + sort.Slice(result.Stores, func(i, j int) bool { + return result.Stores[i].Status.RegionCount < result.Stores[j].Status.RegionCount + }) + minStore := result.Stores[0] + maxStore := result.Stores[len(result.Stores)-1] + if maxStore.Status.RegionCount <= checkRegionCntRatioThreshold { + return nil + } + ratio := float64(minStore.Status.RegionCount) / float64(maxStore.Status.RegionCount) + if ratio < errorRegionCntMinMaxRatio { + passed = false + message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+ + "with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v", + minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio) + } else if ratio < warnRegionCntMinMaxRatio { + message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+ + "with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v", + minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio) + } + return nil +} + +// CheckClusterRegion checks cluster if there are too many empty regions or region distribution is unbalanced. +func (rc *Controller) CheckClusterRegion(ctx context.Context) error { + err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) { + restoreStarted := false + for _, task := range tasks { + if task.status > taskMetaStatusInitial { + restoreStarted = true + break + } + } + if restoreStarted { + return nil, nil + } + if err := rc.checkEmptyRegion(ctx); err != nil { + return nil, errors.Trace(err) + } + if err := rc.checkRegionDistribution(ctx); err != nil { + return nil, errors.Trace(err) + } + return nil, nil + }) + return errors.Trace(err) +} + // StoragePermission checks whether Lightning has enough permission to storage. // this test cannot be skipped. func (rc *Controller) StoragePermission(ctx context.Context) error { diff --git a/br/pkg/lightning/restore/check_template.go b/br/pkg/lightning/restore/check_template.go index 03fb146e9b..ba63c505a4 100644 --- a/br/pkg/lightning/restore/check_template.go +++ b/br/pkg/lightning/restore/check_template.go @@ -30,8 +30,7 @@ const ( type Template interface { // Collect mainly collect performance related checks' results and critical level checks' results. - // If the performance is not as expect. It will output a warn to user and it won't break the whole import task. - // if one of critical check not passed. it will stop import task. + // If the performance is not as expect or one of critical check not passed. it will stop import task. Collect(t CheckType, passed bool, msg string) // Success represents the whole check has passed or not. @@ -73,9 +72,7 @@ func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) { if !passed { switch t { case Critical: - { - c.criticalFailedCount++ - } + c.criticalFailedCount++ case Warn: c.warnFailedCount++ } diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 0b63d6184c..15fa6ac549 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -475,8 +475,11 @@ func RemoveTableMetaByTableName(ctx context.Context, db *sql.DB, metaTable, tabl type taskMetaMgr interface { InitTask(ctx context.Context, source int64) error - CheckClusterSource(ctx context.Context) (int64, error) CheckTaskExist(ctx context.Context) (bool, error) + // CheckTasksExclusively check all tasks exclusively. action is the function to check all tasks and returns the tasks + // need to update or any new tasks. There is at most one lightning who can execute the action function at the same time. + // Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent. + CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) // CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata // Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal) @@ -541,6 +544,15 @@ func parseTaskMetaStatus(s string) (taskMetaStatus, error) { } } +type taskMeta struct { + taskID int64 + pdCfgs string + status taskMetaStatus + state int + sourceBytes uint64 + clusterAvail uint64 +} + type storedCfgs struct { PauseCfg pdutil.ClusterConfig `json:"paused"` RestoreCfg pdutil.ClusterConfig `json:"restore"` @@ -586,23 +598,57 @@ func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return exist, errors.Trace(err) } -func (m *dbTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { +func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { conn, err := m.session.Conn(ctx) if err != nil { - return 0, errors.Trace(err) + return errors.Trace(err) } defer conn.Close() exec := &common.SQLWithRetry{ DB: m.session, Logger: log.L(), } - - source := int64(0) - query := fmt.Sprintf("SELECT SUM(source_bytes) from %s", m.tableName) - if err := exec.QueryRow(ctx, "query total source size", query, &source); err != nil { - return 0, errors.Annotate(err, "fetch task meta failed") + err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';") + if err != nil { + return errors.Annotate(err, "enable pessimistic transaction failed") } - return source, nil + return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error { + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) + rows, err := tx.QueryContext(ctx, query) + if err != nil { + return errors.Annotate(err, "fetch task metas failed") + } + defer rows.Close() + + var tasks []taskMeta + for rows.Next() { + var task taskMeta + var statusValue string + if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil { + return errors.Trace(err) + } + status, err := parseTaskMetaStatus(statusValue) + if err != nil { + return errors.Annotatef(err, "invalid task meta status '%s'", statusValue) + } + task.status = status + tasks = append(tasks, task) + } + if err = rows.Err(); err != nil { + return errors.Trace(err) + } + newTasks, err := action(tasks) + if err != nil { + return errors.Trace(err) + } + for _, task := range newTasks { + query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName) + if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil { + return errors.Trace(err) + } + } + return nil + }) } func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { @@ -899,6 +945,10 @@ func (m noopTaskMetaMgr) InitTask(ctx context.Context, source int64) error { return nil } +func (m noopTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { + return nil +} + func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { return func(ctx context.Context) error { return nil @@ -909,10 +959,6 @@ func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { return false, nil } -func (m noopTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) { - return 0, nil -} - func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) { return false, true, nil } diff --git a/br/pkg/lightning/restore/meta_manager_test.go b/br/pkg/lightning/restore/meta_manager_test.go index 63d26af300..628ead199d 100644 --- a/br/pkg/lightning/restore/meta_manager_test.go +++ b/br/pkg/lightning/restore/meta_manager_test.go @@ -5,6 +5,7 @@ package restore import ( "context" "database/sql/driver" + "sort" "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" @@ -262,3 +263,63 @@ func (s *metaMgrSuite) prepareMock(rowsVal [][]driver.Value, nextRowID *int64, u WillReturnResult(sqlmock.NewResult(int64(0), int64(1))) } } + +var _ = Suite(&taskMetaMgrSuite{}) + +type taskMetaMgrSuite struct { + mgr *dbTaskMetaMgr + mockDB sqlmock.Sqlmock +} + +func (s *taskMetaMgrSuite) SetUpTest(c *C) { + db, m, err := sqlmock.New() + c.Assert(err, IsNil) + + s.mgr = &dbTaskMetaMgr{ + session: db, + taskID: 1, + tableName: common.UniqueTable("test", "t1"), + } + s.mockDB = m +} + +func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) { + s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';"). + WillReturnResult(sqlmock.NewResult(int64(0), int64(0))) + s.mockDB.ExpectBegin() + s.mockDB.ExpectQuery("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from `test`.`t1` FOR UPDATE"). + WillReturnRows(sqlmock.NewRows([]string{"task_id", "pd_cfgs", "status", "state", "source_bytes", "cluster_avail"}). + AddRow("0", "", taskMetaStatusInitial.String(), "0", "0", "0"). + AddRow("1", "", taskMetaStatusInitial.String(), "0", "0", "0"). + AddRow("2", "", taskMetaStatusInitial.String(), "0", "0", "0"). + AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0"). + AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0")) + + s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E"). + WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(0)). + WillReturnResult(sqlmock.NewResult(0, 1)) + s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E"). + WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(0)). + WillReturnResult(sqlmock.NewResult(0, 1)) + s.mockDB.ExpectCommit() + + err := s.mgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) { + c.Assert(len(tasks), Equals, 5) + sort.Slice(tasks, func(i, j int) bool { + return tasks[i].taskID < tasks[j].taskID + }) + for j := 0; j < 5; j++ { + c.Assert(tasks[j].taskID, Equals, int64(j)) + } + + var newTasks []taskMeta + for j := 2; j < 4; j++ { + task := tasks[j] + task.sourceBytes = uint64(j * 1024) + newTasks = append(newTasks, task) + } + return newTasks, nil + }) + c.Assert(err, IsNil) + +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 8462d64eed..d375c95882 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -103,6 +103,7 @@ const ( status VARCHAR(32) NOT NULL, state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish', source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, + cluster_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0, PRIMARY KEY (task_id) );` @@ -1763,7 +1764,8 @@ func (rc *Controller) isLocalBackend() bool { // preCheckRequirements checks // 1. Cluster resource // 2. Local node resource -// 3. Lightning configuration +// 3. Cluster region +// 4. Lightning configuration // before restore tables start. func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err := rc.ClusterIsAvailable(ctx); err != nil { @@ -1779,11 +1781,6 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { taskExist := false if rc.isLocalBackend() { - source, err := rc.EstimateSourceData(ctx) - if err != nil { - return errors.Trace(err) - } - pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr, rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption()) if err != nil { @@ -1796,6 +1793,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { return errors.Trace(err) } if !taskExist { + source, err := rc.EstimateSourceData(ctx) + if err != nil { + return errors.Trace(err) + } err = rc.LocalResource(ctx, source) if err != nil { rc.taskMgr.CleanupTask(ctx) @@ -1805,6 +1806,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { rc.taskMgr.CleanupTask(ctx) return errors.Trace(err) } + if err := rc.CheckClusterRegion(ctx); err != nil { + return errors.Trace(err) + } } } if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() { diff --git a/br/pkg/lightning/restore/restore_test.go b/br/pkg/lightning/restore/restore_test.go index 9fb10e7792..7758d7d81a 100644 --- a/br/pkg/lightning/restore/restore_test.go +++ b/br/pkg/lightning/restore/restore_test.go @@ -16,6 +16,7 @@ package restore import ( "context" + "encoding/json" "fmt" "net/http" "net/http/httptest" @@ -34,6 +35,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/import_kvpb" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -60,6 +62,7 @@ import ( "github.com/pingcap/tidb/br/pkg/version/build" "github.com/pingcap/tidb/ddl" tmock "github.com/pingcap/tidb/util/mock" + "github.com/tikv/pd/server/api" ) var _ = Suite(&restoreSuite{}) @@ -1598,7 +1601,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { "id": 2 }, "status": { - "capacity": "24" + "available": "24" } } ] @@ -1606,7 +1609,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { []byte(`{ "max-replicas": 1 }`), - "(.*)Cluster capacity is rich(.*)", + "(.*)Cluster available is rich(.*)", true, 0, }, @@ -1619,7 +1622,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { "id": 2 }, "status": { - "capacity": "15" + "available": "15" } } ] @@ -1680,6 +1683,130 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) { } } +type mockTaskMetaMgr struct { + taskMetaMgr +} + +func (mockTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { + _, err := action([]taskMeta{{ + taskID: 1, + pdCfgs: "", + status: taskMetaStatusInitial, + state: taskStateNormal, + }}) + return err +} + +func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) { + type testCase struct { + stores api.StoresInfo + emptyRegions api.RegionsInfo + expectMsgs []string + expectResult bool + expectErrorCnt int + } + + makeRegions := func(regionCnt int, storeID uint64) []api.RegionInfo { + var regions []api.RegionInfo + for i := 0; i < regionCnt; i++ { + regions = append(regions, api.RegionInfo{Peers: []api.MetaPeer{{Peer: &metapb.Peer{StoreId: storeID}}}}) + } + return regions + } + + testCases := []testCase{ + { + stores: api.StoresInfo{Stores: []*api.StoreInfo{ + {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 200}}, + }}, + emptyRegions: api.RegionsInfo{ + Regions: append([]api.RegionInfo(nil), makeRegions(100, 1)...), + }, + expectMsgs: []string{".*Cluster doesn't have too many empty regions.*", ".*Cluster region distribution is balanced.*"}, + expectResult: true, + expectErrorCnt: 0, + }, + { + stores: api.StoresInfo{Stores: []*api.StoreInfo{ + {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 2000}}, + {Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3100}}, + {Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}}, + }}, + emptyRegions: api.RegionsInfo{ + Regions: append(append(append([]api.RegionInfo(nil), + makeRegions(600, 1)...), + makeRegions(300, 2)...), + makeRegions(1200, 3)...), + }, + expectMsgs: []string{ + ".*TiKV stores \\(3\\) contains more than 1000 empty regions respectively.*", + ".*TiKV stores \\(1\\) contains more than 500 empty regions respectively.*", + ".*Region distribution is unbalanced.*but we expect it should not be less than 0.75.*", + }, + expectResult: false, + expectErrorCnt: 1, + }, + { + stores: api.StoresInfo{Stores: []*api.StoreInfo{ + {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 1200}}, + {Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3000}}, + {Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}}, + }}, + expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"}, + expectResult: false, + expectErrorCnt: 1, + }, + { + stores: api.StoresInfo{Stores: []*api.StoreInfo{ + {Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 0}}, + {Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 2800}}, + {Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}}, + }}, + expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"}, + expectResult: false, + expectErrorCnt: 1, + }, + } + + mustMarshal := func(v interface{}) []byte { + data, err := json.Marshal(v) + c.Assert(err, IsNil) + return data + } + + for _, ca := range testCases { + server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + var err error + if req.URL.Path == pdStores { + _, err = w.Write(mustMarshal(ca.stores)) + } else if req.URL.Path == pdEmptyRegions { + _, err = w.Write(mustMarshal(ca.emptyRegions)) + } else { + w.WriteHeader(http.StatusNotFound) + } + c.Assert(err, IsNil) + })) + + tls := common.NewTLSFromMockServer(server) + template := NewSimpleTemplate() + + url := strings.TrimPrefix(server.URL, "https://") + cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}} + rc := &Controller{cfg: cfg, tls: tls, taskMgr: mockTaskMetaMgr{}, checkTemplate: template} + + err := rc.CheckClusterRegion(context.Background()) + c.Assert(err, IsNil) + c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCnt) + c.Assert(template.Success(), Equals, ca.expectResult) + + for _, expectMsg := range ca.expectMsgs { + c.Assert(strings.ReplaceAll(template.Output(), "\n", ""), Matches, expectMsg) + } + + server.Close() + } +} + func (s *tableRestoreSuite) TestCheckHasLargeCSV(c *C) { cases := []struct { strictFormat bool diff --git a/br/pkg/lightning/restore/table_restore.go b/br/pkg/lightning/restore/table_restore.go index 726856da51..ecc22ead59 100644 --- a/br/pkg/lightning/restore/table_restore.go +++ b/br/pkg/lightning/restore/table_restore.go @@ -132,7 +132,7 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp * return err } -func (t *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { +func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) { if rowIDBase == 0 { return }