lightning: precheck cluster region and available space (#27082)
This commit is contained in:
@ -21,6 +21,8 @@ import (
|
||||
"io"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/go-units"
|
||||
@ -38,29 +40,27 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/lightning/verification"
|
||||
"github.com/pingcap/tidb/br/pkg/storage"
|
||||
"github.com/pingcap/tidb/table/tables"
|
||||
"github.com/tikv/pd/pkg/typeutil"
|
||||
"github.com/tikv/pd/server/api"
|
||||
pdconfig "github.com/tikv/pd/server/config"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
pdWriteFlow = "/pd/api/v1/regions/writeflow"
|
||||
pdReadFlow = "/pd/api/v1/regions/readflow"
|
||||
|
||||
// OnlineBytesLimitation/OnlineKeysLimitation is the statistics of
|
||||
// Bytes/Keys used per region from pdWriteFlow/pdReadFlow
|
||||
// this determines whether the cluster has some region that have other loads
|
||||
// and might influence the import task in the future.
|
||||
OnlineBytesLimitation = 10 * units.MiB
|
||||
OnlineKeysLimitation = 5000
|
||||
|
||||
pdStores = "/pd/api/v1/stores"
|
||||
pdReplicate = "/pd/api/v1/config/replicate"
|
||||
pdStores = "/pd/api/v1/stores"
|
||||
pdReplicate = "/pd/api/v1/config/replicate"
|
||||
pdEmptyRegions = "/pd/api/v1/regions/check/empty-region"
|
||||
|
||||
defaultCSVSize = 10 * units.GiB
|
||||
maxSampleDataSize = 10 * 1024 * 1024
|
||||
maxSampleRowCount = 10 * 1024
|
||||
|
||||
warnEmptyRegionCntPerStore = 500
|
||||
errorEmptyRegionCntPerStore = 1000
|
||||
warnRegionCntMinMaxRatio = 0.75
|
||||
errorRegionCntMinMaxRatio = 0.5
|
||||
|
||||
// We only check RegionCntMaxMinRatio when the maximum region count of all stores is larger than this threshold.
|
||||
checkRegionCntRatioThreshold = 1000
|
||||
)
|
||||
|
||||
func (rc *Controller) isSourceInLocal() bool {
|
||||
@ -76,6 +76,18 @@ func (rc *Controller) getReplicaCount(ctx context.Context) (uint64, error) {
|
||||
return result.MaxReplicas, nil
|
||||
}
|
||||
|
||||
func (rc *Controller) getClusterAvail(ctx context.Context) (uint64, error) {
|
||||
result := &api.StoresInfo{}
|
||||
if err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result); err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
clusterAvail := uint64(0)
|
||||
for _, store := range result.Stores {
|
||||
clusterAvail += uint64(store.Status.Available)
|
||||
}
|
||||
return clusterAvail, nil
|
||||
}
|
||||
|
||||
// ClusterResource check cluster has enough resource to import data. this test can by skipped.
|
||||
func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) error {
|
||||
passed := true
|
||||
@ -84,35 +96,62 @@ func (rc *Controller) ClusterResource(ctx context.Context, localSource int64) er
|
||||
rc.checkTemplate.Collect(Critical, passed, message)
|
||||
}()
|
||||
|
||||
result := &api.StoresInfo{}
|
||||
err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
totalCapacity := typeutil.ByteSize(0)
|
||||
for _, store := range result.Stores {
|
||||
totalCapacity += store.Status.Capacity
|
||||
}
|
||||
clusterSource := localSource
|
||||
if rc.taskMgr != nil {
|
||||
clusterSource, err = rc.taskMgr.CheckClusterSource(ctx)
|
||||
var (
|
||||
clusterAvail uint64
|
||||
clusterSource uint64
|
||||
)
|
||||
if rc.taskMgr == nil {
|
||||
var err error
|
||||
clusterAvail, err = rc.getClusterAvail(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
clusterSource = uint64(localSource)
|
||||
} else {
|
||||
if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
|
||||
clusterAvail = 0
|
||||
clusterSource = 0
|
||||
restoreStarted := false
|
||||
for _, task := range tasks {
|
||||
if task.status > taskMetaStatusInitial {
|
||||
restoreStarted = true
|
||||
}
|
||||
clusterSource += task.sourceBytes
|
||||
if task.clusterAvail > 0 {
|
||||
clusterAvail = task.clusterAvail
|
||||
}
|
||||
}
|
||||
if restoreStarted || clusterAvail > 0 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var err error
|
||||
clusterAvail, err = rc.getClusterAvail(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
newTasks := append([]taskMeta(nil), tasks...)
|
||||
for i := 0; i < len(newTasks); i++ {
|
||||
newTasks[i].clusterAvail = clusterAvail
|
||||
}
|
||||
return newTasks, nil
|
||||
}); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
replicaCount, err := rc.getReplicaCount(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
estimateSize := uint64(clusterSource) * replicaCount
|
||||
if typeutil.ByteSize(estimateSize) > totalCapacity {
|
||||
estimateSize := clusterSource * replicaCount
|
||||
if estimateSize > clusterAvail {
|
||||
passed = false
|
||||
message = fmt.Sprintf("Cluster doesn't have enough space, capacity is %s, but we need %s",
|
||||
units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize)))
|
||||
message = fmt.Sprintf("Cluster doesn't have enough space, available is %s, but we need %s",
|
||||
units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(estimateSize)))
|
||||
} else {
|
||||
message = fmt.Sprintf("Cluster capacity is rich, capacity is %s, we need %s",
|
||||
units.BytesSize(float64(totalCapacity)), units.BytesSize(float64(estimateSize)))
|
||||
message = fmt.Sprintf("Cluster available is rich, available is %s, we need %s",
|
||||
units.BytesSize(float64(clusterAvail)), units.BytesSize(float64(estimateSize)))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -139,6 +178,114 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
|
||||
passed := true
|
||||
message := "Cluster doesn't have too many empty regions"
|
||||
defer func() {
|
||||
rc.checkTemplate.Collect(Critical, passed, message)
|
||||
}()
|
||||
|
||||
var result api.RegionsInfo
|
||||
if err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdEmptyRegions, &result); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
regions := make(map[uint64]int)
|
||||
for _, region := range result.Regions {
|
||||
for _, peer := range region.Peers {
|
||||
regions[peer.StoreId]++
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
errStores []string
|
||||
warnStores []string
|
||||
)
|
||||
for storeID, regionCnt := range regions {
|
||||
if regionCnt > errorEmptyRegionCntPerStore {
|
||||
errStores = append(errStores, strconv.Itoa(int(storeID)))
|
||||
} else if regionCnt > warnEmptyRegionCntPerStore {
|
||||
warnStores = append(warnStores, strconv.Itoa(int(storeID)))
|
||||
}
|
||||
}
|
||||
|
||||
var messages []string
|
||||
if len(errStores) > 0 {
|
||||
passed = false
|
||||
messages = append(messages, fmt.Sprintf("TiKV stores (%s) contains more than %v empty regions respectively, "+
|
||||
"which will greatly affect the import speed and success rate", strings.Join(errStores, ", "), errorEmptyRegionCntPerStore))
|
||||
}
|
||||
if len(warnStores) > 0 {
|
||||
messages = append(messages, fmt.Sprintf("TiKV stores (%s) contains more than %v empty regions respectively, "+
|
||||
"which will affect the import speed and success rate", strings.Join(warnStores, ", "), warnEmptyRegionCntPerStore))
|
||||
}
|
||||
if len(messages) > 0 {
|
||||
message = strings.Join(messages, "\n")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkRegionDistribution checks if regions distribution is unbalanced.
|
||||
func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
|
||||
passed := true
|
||||
message := "Cluster region distribution is balanced"
|
||||
defer func() {
|
||||
rc.checkTemplate.Collect(Critical, passed, message)
|
||||
}()
|
||||
|
||||
result := &api.StoresInfo{}
|
||||
err := rc.tls.WithHost(rc.cfg.TiDB.PdAddr).GetJSON(ctx, pdStores, result)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if len(result.Stores) <= 1 {
|
||||
return nil
|
||||
}
|
||||
sort.Slice(result.Stores, func(i, j int) bool {
|
||||
return result.Stores[i].Status.RegionCount < result.Stores[j].Status.RegionCount
|
||||
})
|
||||
minStore := result.Stores[0]
|
||||
maxStore := result.Stores[len(result.Stores)-1]
|
||||
if maxStore.Status.RegionCount <= checkRegionCntRatioThreshold {
|
||||
return nil
|
||||
}
|
||||
ratio := float64(minStore.Status.RegionCount) / float64(maxStore.Status.RegionCount)
|
||||
if ratio < errorRegionCntMinMaxRatio {
|
||||
passed = false
|
||||
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
|
||||
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v",
|
||||
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
|
||||
} else if ratio < warnRegionCntMinMaxRatio {
|
||||
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
|
||||
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v",
|
||||
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CheckClusterRegion checks cluster if there are too many empty regions or region distribution is unbalanced.
|
||||
func (rc *Controller) CheckClusterRegion(ctx context.Context) error {
|
||||
err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
|
||||
restoreStarted := false
|
||||
for _, task := range tasks {
|
||||
if task.status > taskMetaStatusInitial {
|
||||
restoreStarted = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if restoreStarted {
|
||||
return nil, nil
|
||||
}
|
||||
if err := rc.checkEmptyRegion(ctx); err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
if err := rc.checkRegionDistribution(ctx); err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return nil, nil
|
||||
})
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// StoragePermission checks whether Lightning has enough permission to storage.
|
||||
// this test cannot be skipped.
|
||||
func (rc *Controller) StoragePermission(ctx context.Context) error {
|
||||
|
||||
@ -30,8 +30,7 @@ const (
|
||||
|
||||
type Template interface {
|
||||
// Collect mainly collect performance related checks' results and critical level checks' results.
|
||||
// If the performance is not as expect. It will output a warn to user and it won't break the whole import task.
|
||||
// if one of critical check not passed. it will stop import task.
|
||||
// If the performance is not as expect or one of critical check not passed. it will stop import task.
|
||||
Collect(t CheckType, passed bool, msg string)
|
||||
|
||||
// Success represents the whole check has passed or not.
|
||||
@ -73,9 +72,7 @@ func (c *SimpleTemplate) Collect(t CheckType, passed bool, msg string) {
|
||||
if !passed {
|
||||
switch t {
|
||||
case Critical:
|
||||
{
|
||||
c.criticalFailedCount++
|
||||
}
|
||||
c.criticalFailedCount++
|
||||
case Warn:
|
||||
c.warnFailedCount++
|
||||
}
|
||||
|
||||
@ -475,8 +475,11 @@ func RemoveTableMetaByTableName(ctx context.Context, db *sql.DB, metaTable, tabl
|
||||
|
||||
type taskMetaMgr interface {
|
||||
InitTask(ctx context.Context, source int64) error
|
||||
CheckClusterSource(ctx context.Context) (int64, error)
|
||||
CheckTaskExist(ctx context.Context) (bool, error)
|
||||
// CheckTasksExclusively check all tasks exclusively. action is the function to check all tasks and returns the tasks
|
||||
// need to update or any new tasks. There is at most one lightning who can execute the action function at the same time.
|
||||
// Note that action may be executed multiple times due to transaction retry, caller should make sure it's idempotent.
|
||||
CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error
|
||||
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
|
||||
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
|
||||
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
|
||||
@ -541,6 +544,15 @@ func parseTaskMetaStatus(s string) (taskMetaStatus, error) {
|
||||
}
|
||||
}
|
||||
|
||||
type taskMeta struct {
|
||||
taskID int64
|
||||
pdCfgs string
|
||||
status taskMetaStatus
|
||||
state int
|
||||
sourceBytes uint64
|
||||
clusterAvail uint64
|
||||
}
|
||||
|
||||
type storedCfgs struct {
|
||||
PauseCfg pdutil.ClusterConfig `json:"paused"`
|
||||
RestoreCfg pdutil.ClusterConfig `json:"restore"`
|
||||
@ -586,23 +598,57 @@ func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
|
||||
return exist, errors.Trace(err)
|
||||
}
|
||||
|
||||
func (m *dbTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) {
|
||||
func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
|
||||
conn, err := m.session.Conn(ctx)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
exec := &common.SQLWithRetry{
|
||||
DB: m.session,
|
||||
Logger: log.L(),
|
||||
}
|
||||
|
||||
source := int64(0)
|
||||
query := fmt.Sprintf("SELECT SUM(source_bytes) from %s", m.tableName)
|
||||
if err := exec.QueryRow(ctx, "query total source size", query, &source); err != nil {
|
||||
return 0, errors.Annotate(err, "fetch task meta failed")
|
||||
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';")
|
||||
if err != nil {
|
||||
return errors.Annotate(err, "enable pessimistic transaction failed")
|
||||
}
|
||||
return source, nil
|
||||
return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error {
|
||||
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName)
|
||||
rows, err := tx.QueryContext(ctx, query)
|
||||
if err != nil {
|
||||
return errors.Annotate(err, "fetch task metas failed")
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var tasks []taskMeta
|
||||
for rows.Next() {
|
||||
var task taskMeta
|
||||
var statusValue string
|
||||
if err = rows.Scan(&task.taskID, &task.pdCfgs, &statusValue, &task.state, &task.sourceBytes, &task.clusterAvail); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
status, err := parseTaskMetaStatus(statusValue)
|
||||
if err != nil {
|
||||
return errors.Annotatef(err, "invalid task meta status '%s'", statusValue)
|
||||
}
|
||||
task.status = status
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
if err = rows.Err(); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
newTasks, err := action(tasks)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
for _, task := range newTasks {
|
||||
query := fmt.Sprintf("REPLACE INTO %s (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)", m.tableName)
|
||||
if _, err = tx.ExecContext(ctx, query, task.taskID, task.pdCfgs, task.status.String(), task.state, task.sourceBytes, task.clusterAvail); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
|
||||
@ -899,6 +945,10 @@ func (m noopTaskMetaMgr) InitTask(ctx context.Context, source int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m noopTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) {
|
||||
return func(ctx context.Context) error {
|
||||
return nil
|
||||
@ -909,10 +959,6 @@ func (m noopTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (m noopTaskMetaMgr) CheckClusterSource(ctx context.Context) (int64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) {
|
||||
return false, true, nil
|
||||
}
|
||||
|
||||
@ -5,6 +5,7 @@ package restore
|
||||
import (
|
||||
"context"
|
||||
"database/sql/driver"
|
||||
"sort"
|
||||
|
||||
"github.com/DATA-DOG/go-sqlmock"
|
||||
. "github.com/pingcap/check"
|
||||
@ -262,3 +263,63 @@ func (s *metaMgrSuite) prepareMock(rowsVal [][]driver.Value, nextRowID *int64, u
|
||||
WillReturnResult(sqlmock.NewResult(int64(0), int64(1)))
|
||||
}
|
||||
}
|
||||
|
||||
var _ = Suite(&taskMetaMgrSuite{})
|
||||
|
||||
type taskMetaMgrSuite struct {
|
||||
mgr *dbTaskMetaMgr
|
||||
mockDB sqlmock.Sqlmock
|
||||
}
|
||||
|
||||
func (s *taskMetaMgrSuite) SetUpTest(c *C) {
|
||||
db, m, err := sqlmock.New()
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
s.mgr = &dbTaskMetaMgr{
|
||||
session: db,
|
||||
taskID: 1,
|
||||
tableName: common.UniqueTable("test", "t1"),
|
||||
}
|
||||
s.mockDB = m
|
||||
}
|
||||
|
||||
func (s *taskMetaMgrSuite) TestCheckTasksExclusively(c *C) {
|
||||
s.mockDB.ExpectExec("SET SESSION tidb_txn_mode = 'pessimistic';").
|
||||
WillReturnResult(sqlmock.NewResult(int64(0), int64(0)))
|
||||
s.mockDB.ExpectBegin()
|
||||
s.mockDB.ExpectQuery("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from `test`.`t1` FOR UPDATE").
|
||||
WillReturnRows(sqlmock.NewRows([]string{"task_id", "pd_cfgs", "status", "state", "source_bytes", "cluster_avail"}).
|
||||
AddRow("0", "", taskMetaStatusInitial.String(), "0", "0", "0").
|
||||
AddRow("1", "", taskMetaStatusInitial.String(), "0", "0", "0").
|
||||
AddRow("2", "", taskMetaStatusInitial.String(), "0", "0", "0").
|
||||
AddRow("3", "", taskMetaStatusInitial.String(), "0", "0", "0").
|
||||
AddRow("4", "", taskMetaStatusInitial.String(), "0", "0", "0"))
|
||||
|
||||
s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
|
||||
WithArgs(int64(2), "", taskMetaStatusInitial.String(), int(0), uint64(2048), uint64(0)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
s.mockDB.ExpectExec("\\QREPLACE INTO `test`.`t1` (task_id, pd_cfgs, status, state, source_bytes, cluster_avail) VALUES(?, ?, ?, ?, ?, ?)\\E").
|
||||
WithArgs(int64(3), "", taskMetaStatusInitial.String(), int(0), uint64(3072), uint64(0)).
|
||||
WillReturnResult(sqlmock.NewResult(0, 1))
|
||||
s.mockDB.ExpectCommit()
|
||||
|
||||
err := s.mgr.CheckTasksExclusively(context.Background(), func(tasks []taskMeta) ([]taskMeta, error) {
|
||||
c.Assert(len(tasks), Equals, 5)
|
||||
sort.Slice(tasks, func(i, j int) bool {
|
||||
return tasks[i].taskID < tasks[j].taskID
|
||||
})
|
||||
for j := 0; j < 5; j++ {
|
||||
c.Assert(tasks[j].taskID, Equals, int64(j))
|
||||
}
|
||||
|
||||
var newTasks []taskMeta
|
||||
for j := 2; j < 4; j++ {
|
||||
task := tasks[j]
|
||||
task.sourceBytes = uint64(j * 1024)
|
||||
newTasks = append(newTasks, task)
|
||||
}
|
||||
return newTasks, nil
|
||||
})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
}
|
||||
|
||||
@ -103,6 +103,7 @@ const (
|
||||
status VARCHAR(32) NOT NULL,
|
||||
state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish',
|
||||
source_bytes BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
|
||||
cluster_avail BIGINT(20) UNSIGNED NOT NULL DEFAULT 0,
|
||||
PRIMARY KEY (task_id)
|
||||
);`
|
||||
|
||||
@ -1763,7 +1764,8 @@ func (rc *Controller) isLocalBackend() bool {
|
||||
// preCheckRequirements checks
|
||||
// 1. Cluster resource
|
||||
// 2. Local node resource
|
||||
// 3. Lightning configuration
|
||||
// 3. Cluster region
|
||||
// 4. Lightning configuration
|
||||
// before restore tables start.
|
||||
func (rc *Controller) preCheckRequirements(ctx context.Context) error {
|
||||
if err := rc.ClusterIsAvailable(ctx); err != nil {
|
||||
@ -1779,11 +1781,6 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
|
||||
taskExist := false
|
||||
|
||||
if rc.isLocalBackend() {
|
||||
source, err := rc.EstimateSourceData(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
|
||||
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
|
||||
if err != nil {
|
||||
@ -1796,6 +1793,10 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if !taskExist {
|
||||
source, err := rc.EstimateSourceData(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = rc.LocalResource(ctx, source)
|
||||
if err != nil {
|
||||
rc.taskMgr.CleanupTask(ctx)
|
||||
@ -1805,6 +1806,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
|
||||
rc.taskMgr.CleanupTask(ctx)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if err := rc.CheckClusterRegion(ctx); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if rc.cfg.App.CheckRequirements && rc.tidbGlue.OwnsSQLExecutor() {
|
||||
|
||||
@ -16,6 +16,7 @@ package restore
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@ -34,6 +35,7 @@ import (
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/kvproto/pkg/import_kvpb"
|
||||
"github.com/pingcap/kvproto/pkg/metapb"
|
||||
"github.com/pingcap/parser"
|
||||
"github.com/pingcap/parser/ast"
|
||||
"github.com/pingcap/parser/model"
|
||||
@ -60,6 +62,7 @@ import (
|
||||
"github.com/pingcap/tidb/br/pkg/version/build"
|
||||
"github.com/pingcap/tidb/ddl"
|
||||
tmock "github.com/pingcap/tidb/util/mock"
|
||||
"github.com/tikv/pd/server/api"
|
||||
)
|
||||
|
||||
var _ = Suite(&restoreSuite{})
|
||||
@ -1598,7 +1601,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
|
||||
"id": 2
|
||||
},
|
||||
"status": {
|
||||
"capacity": "24"
|
||||
"available": "24"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -1606,7 +1609,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
|
||||
[]byte(`{
|
||||
"max-replicas": 1
|
||||
}`),
|
||||
"(.*)Cluster capacity is rich(.*)",
|
||||
"(.*)Cluster available is rich(.*)",
|
||||
true,
|
||||
0,
|
||||
},
|
||||
@ -1619,7 +1622,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
|
||||
"id": 2
|
||||
},
|
||||
"status": {
|
||||
"capacity": "15"
|
||||
"available": "15"
|
||||
}
|
||||
}
|
||||
]
|
||||
@ -1680,6 +1683,130 @@ func (s *tableRestoreSuite) TestCheckClusterResource(c *C) {
|
||||
}
|
||||
}
|
||||
|
||||
type mockTaskMetaMgr struct {
|
||||
taskMetaMgr
|
||||
}
|
||||
|
||||
func (mockTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error {
|
||||
_, err := action([]taskMeta{{
|
||||
taskID: 1,
|
||||
pdCfgs: "",
|
||||
status: taskMetaStatusInitial,
|
||||
state: taskStateNormal,
|
||||
}})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
|
||||
type testCase struct {
|
||||
stores api.StoresInfo
|
||||
emptyRegions api.RegionsInfo
|
||||
expectMsgs []string
|
||||
expectResult bool
|
||||
expectErrorCnt int
|
||||
}
|
||||
|
||||
makeRegions := func(regionCnt int, storeID uint64) []api.RegionInfo {
|
||||
var regions []api.RegionInfo
|
||||
for i := 0; i < regionCnt; i++ {
|
||||
regions = append(regions, api.RegionInfo{Peers: []api.MetaPeer{{Peer: &metapb.Peer{StoreId: storeID}}}})
|
||||
}
|
||||
return regions
|
||||
}
|
||||
|
||||
testCases := []testCase{
|
||||
{
|
||||
stores: api.StoresInfo{Stores: []*api.StoreInfo{
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 200}},
|
||||
}},
|
||||
emptyRegions: api.RegionsInfo{
|
||||
Regions: append([]api.RegionInfo(nil), makeRegions(100, 1)...),
|
||||
},
|
||||
expectMsgs: []string{".*Cluster doesn't have too many empty regions.*", ".*Cluster region distribution is balanced.*"},
|
||||
expectResult: true,
|
||||
expectErrorCnt: 0,
|
||||
},
|
||||
{
|
||||
stores: api.StoresInfo{Stores: []*api.StoreInfo{
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 2000}},
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3100}},
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
|
||||
}},
|
||||
emptyRegions: api.RegionsInfo{
|
||||
Regions: append(append(append([]api.RegionInfo(nil),
|
||||
makeRegions(600, 1)...),
|
||||
makeRegions(300, 2)...),
|
||||
makeRegions(1200, 3)...),
|
||||
},
|
||||
expectMsgs: []string{
|
||||
".*TiKV stores \\(3\\) contains more than 1000 empty regions respectively.*",
|
||||
".*TiKV stores \\(1\\) contains more than 500 empty regions respectively.*",
|
||||
".*Region distribution is unbalanced.*but we expect it should not be less than 0.75.*",
|
||||
},
|
||||
expectResult: false,
|
||||
expectErrorCnt: 1,
|
||||
},
|
||||
{
|
||||
stores: api.StoresInfo{Stores: []*api.StoreInfo{
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 1200}},
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3000}},
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
|
||||
}},
|
||||
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
|
||||
expectResult: false,
|
||||
expectErrorCnt: 1,
|
||||
},
|
||||
{
|
||||
stores: api.StoresInfo{Stores: []*api.StoreInfo{
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 0}},
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 2800}},
|
||||
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
|
||||
}},
|
||||
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
|
||||
expectResult: false,
|
||||
expectErrorCnt: 1,
|
||||
},
|
||||
}
|
||||
|
||||
mustMarshal := func(v interface{}) []byte {
|
||||
data, err := json.Marshal(v)
|
||||
c.Assert(err, IsNil)
|
||||
return data
|
||||
}
|
||||
|
||||
for _, ca := range testCases {
|
||||
server := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
var err error
|
||||
if req.URL.Path == pdStores {
|
||||
_, err = w.Write(mustMarshal(ca.stores))
|
||||
} else if req.URL.Path == pdEmptyRegions {
|
||||
_, err = w.Write(mustMarshal(ca.emptyRegions))
|
||||
} else {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
c.Assert(err, IsNil)
|
||||
}))
|
||||
|
||||
tls := common.NewTLSFromMockServer(server)
|
||||
template := NewSimpleTemplate()
|
||||
|
||||
url := strings.TrimPrefix(server.URL, "https://")
|
||||
cfg := &config.Config{TiDB: config.DBStore{PdAddr: url}}
|
||||
rc := &Controller{cfg: cfg, tls: tls, taskMgr: mockTaskMetaMgr{}, checkTemplate: template}
|
||||
|
||||
err := rc.CheckClusterRegion(context.Background())
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(template.FailedCount(Critical), Equals, ca.expectErrorCnt)
|
||||
c.Assert(template.Success(), Equals, ca.expectResult)
|
||||
|
||||
for _, expectMsg := range ca.expectMsgs {
|
||||
c.Assert(strings.ReplaceAll(template.Output(), "\n", ""), Matches, expectMsg)
|
||||
}
|
||||
|
||||
server.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func (s *tableRestoreSuite) TestCheckHasLargeCSV(c *C) {
|
||||
cases := []struct {
|
||||
strictFormat bool
|
||||
|
||||
@ -132,7 +132,7 @@ func (tr *TableRestore) populateChunks(ctx context.Context, rc *Controller, cp *
|
||||
return err
|
||||
}
|
||||
|
||||
func (t *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
|
||||
func (tr *TableRestore) RebaseChunkRowIDs(cp *checkpoints.TableCheckpoint, rowIDBase int64) {
|
||||
if rowIDBase == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user