lightning: auto adjust dynamic region configuration (#34537)

close pingcap/tidb#34536
This commit is contained in:
Chunzhu Li
2022-05-16 17:26:37 +08:00
committed by GitHub
parent 0e19c1cd64
commit 6d43ec79cc
12 changed files with 90 additions and 36 deletions

View File

@ -152,7 +152,7 @@ type AbstractBackend interface {
// ImportEngine imports engine data to the backend. If it returns ErrDuplicateDetected,
// it means there is duplicate detected. For this situation, all data in the engine must be imported.
// It's safe to reset or cleanup this engine.
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error
ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error
CleanupEngine(ctx context.Context, engineUUID uuid.UUID) error
@ -315,7 +315,7 @@ func (be Backend) CheckDiskQuota(quota int64) (
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
@ -325,7 +325,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx, regionSplitSize); err != nil {
if err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
@ -445,12 +445,12 @@ func (en engine) unsafeClose(ctx context.Context, cfg *EngineConfig) (*ClosedEng
}
// Import the data written to the engine into the target.
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize int64) error {
func (engine *ClosedEngine) Import(ctx context.Context, regionSplitSize, regionSplitKeys int64) error {
var err error
for i := 0; i < importMaxRetryTimes; i++ {
task := engine.logger.With(zap.Int("retryCnt", i)).Begin(zap.InfoLevel, "import")
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize)
err = engine.backend.ImportEngine(ctx, engine.uuid, regionSplitSize, regionSplitKeys)
if !common.IsRetryableError(err) {
task.End(zap.ErrorLevel, err)
return err

View File

@ -54,7 +54,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
Return(nil).
After(openCall)
importCall := s.mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(nil).
After(closeCall)
s.mockBackend.EXPECT().
@ -66,7 +66,7 @@ func TestOpenCloseImportCleanUpEngine(t *testing.T) {
require.NoError(t, err)
closedEngine, err := engine.Close(ctx, nil)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
err = closedEngine.Cleanup(ctx)
require.NoError(t, err)
@ -250,12 +250,12 @@ func TestImportFailedNoRetry(t *testing.T) {
s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake unrecoverable import error"))
closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.Error(t, err)
require.Regexp(t, "^fake unrecoverable import error", err.Error())
}
@ -268,14 +268,14 @@ func TestImportFailedWithRetry(t *testing.T) {
s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(errors.Annotate(driver.ErrBadConn, "fake recoverable import error")).
MinTimes(2)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()
closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.Error(t, err)
require.Contains(t, err.Error(), "fake recoverable import error")
}
@ -288,16 +288,16 @@ func TestImportFailedRecovered(t *testing.T) {
s.mockBackend.EXPECT().CloseEngine(ctx, nil, gomock.Any()).Return(nil)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(gmysql.ErrInvalidConn)
s.mockBackend.EXPECT().
ImportEngine(ctx, gomock.Any(), gomock.Any()).
ImportEngine(ctx, gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil)
s.mockBackend.EXPECT().RetryImportDelay().Return(time.Duration(0)).AnyTimes()
closedEngine, err := s.backend.UnsafeCloseEngine(ctx, nil, "`db`.`table`", 1)
require.NoError(t, err)
err = closedEngine.Import(ctx, 1)
err = closedEngine.Import(ctx, 1, 1)
require.NoError(t, err)
}

View File

@ -88,10 +88,6 @@ const (
gRPCKeepAliveTimeout = 5 * time.Minute
gRPCBackOffMaxDelay = 10 * time.Minute
// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
regionMaxKeyCount = 1_280_000
defaultRegionSplitSize = 96 * units.MiB
// The max ranges count in a batch to split and scatter.
maxBatchSplitRanges = 4096
@ -823,7 +819,7 @@ func (local *local) WriteToTiKV(
// if region-split-size <= 96MiB, we bump the threshold a bit to avoid too many retry split
// because the range-properties is not 100% accurate
regionMaxSize := regionSplitSize
if regionSplitSize <= defaultRegionSplitSize {
if regionSplitSize <= int64(config.SplitRegionSize) {
regionMaxSize = regionSplitSize * 4 / 3
}
@ -1328,7 +1324,7 @@ func (local *local) writeAndIngestByRanges(ctx context.Context, engine *Engine,
return allErr
}
func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
lf := local.lockEngine(engineUUID, importMutexStateImport)
if lf == nil {
// skip if engine not exist. See the comment of `CloseEngine` for more detail.
@ -1342,9 +1338,16 @@ func (local *local) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regi
log.L().Info("engine contains no kv, skip import", zap.Stringer("engine", engineUUID))
return nil
}
regionSplitKeys := int64(regionMaxKeyCount)
if regionSplitSize > defaultRegionSplitSize {
regionSplitKeys = int64(float64(regionSplitSize) / float64(defaultRegionSplitSize) * float64(regionMaxKeyCount))
kvRegionSplitSize, kvRegionSplitKeys, err := getRegionSplitSizeKeys(ctx, local.pdCtl.GetPDClient(), local.tls)
if err == nil {
if kvRegionSplitSize > regionSplitSize {
regionSplitSize = kvRegionSplitSize
}
if kvRegionSplitKeys > regionSplitKeys {
regionSplitKeys = kvRegionSplitKeys
}
} else {
log.L().Warn("fail to get region split keys and size", zap.Error(err))
}
// split sorted file into range by 96MB size per file
@ -1842,3 +1845,41 @@ func (local *local) EngineFileSizes() (res []backend.EngineFileSize) {
})
return
}
func getSplitConfFromStore(ctx context.Context, host string, tls *common.TLS) (int64, int64, error) {
var (
nested struct {
Coprocessor struct {
RegionSplitSize string `json:"region-split-size"`
RegionSplitKeys int64 `json:"region-split-keys"`
} `json:"coprocessor"`
}
)
if err := tls.WithHost(host).GetJSON(ctx, "/config", &nested); err != nil {
return 0, 0, errors.Trace(err)
}
splitSize, err := units.FromHumanSize(nested.Coprocessor.RegionSplitSize)
if err != nil {
return 0, 0, errors.Trace(err)
}
return splitSize, nested.Coprocessor.RegionSplitKeys, nil
}
func getRegionSplitSizeKeys(ctx context.Context, cli pd.Client, tls *common.TLS) (int64, int64, error) {
stores, err := cli.GetAllStores(ctx, pd.WithExcludeTombstone())
if err != nil {
return 0, 0, err
}
for _, store := range stores {
if store.StatusAddress == "" || version.IsTiFlash(store) {
continue
}
regionSplitSize, regionSplitKeys, err := getSplitConfFromStore(ctx, store.StatusAddress, tls)
if err == nil {
return regionSplitSize, regionSplitKeys, nil
}
log.L().Warn("get region split size and keys failed", zap.Error(err), zap.String("store", store.StatusAddress))
}
return 0, 0, errors.New("get region split size and keys failed")
}

View File

@ -79,7 +79,7 @@ func (b noopBackend) CloseEngine(ctx context.Context, cfg *backend.EngineConfig,
return nil
}
func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize int64) error {
func (b noopBackend) ImportEngine(ctx context.Context, engineUUID uuid.UUID, regionSplitSize, regionSplitKeys int64) error {
return nil
}

View File

@ -432,7 +432,7 @@ func (be *tidbBackend) ResolveDuplicateRows(ctx context.Context, tbl table.Table
return nil
}
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64) error {
func (be *tidbBackend) ImportEngine(context.Context, uuid.UUID, int64, int64) error {
return nil
}

View File

@ -523,6 +523,7 @@ type TikvImporter struct {
MaxKVPairs int `toml:"max-kv-pairs" json:"max-kv-pairs"`
SendKVPairs int `toml:"send-kv-pairs" json:"send-kv-pairs"`
RegionSplitSize ByteSize `toml:"region-split-size" json:"region-split-size"`
RegionSplitKeys int `toml:"region-split-keys" json:"region-split-keys"`
SortedKVDir string `toml:"sorted-kv-dir" json:"sorted-kv-dir"`
DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"`
RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"`

View File

@ -20,9 +20,12 @@ import (
const (
// mydumper
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
ReadBlockSize ByteSize = 64 * units.KiB
MaxRegionSize ByteSize = 256 * units.MiB
// See: https://github.com/tikv/tikv/blob/e030a0aae9622f3774df89c62f21b2171a72a69e/etc/config-template.toml#L360
// lower the max-key-count to avoid tikv trigger region auto split
SplitRegionSize ByteSize = 96 * units.MiB
SplitRegionKeys int = 1_280_000
MaxSplitRegionSizeRatio int = 10
BufferSizeScale = 5

View File

@ -1807,7 +1807,7 @@ func (rc *Controller) enforceDiskQuota(ctx context.Context) {
var importErr error
for _, engine := range largeEngines {
// Use a larger split region size to avoid split the same region by many times.
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
if err := rc.backend.UnsafeImportAndReset(ctx, engine, int64(config.SplitRegionSize)*int64(config.MaxSplitRegionSizeRatio), int64(config.SplitRegionKeys)*int64(config.MaxSplitRegionSizeRatio)); err != nil {
importErr = multierr.Append(importErr, err)
}
}

View File

@ -921,6 +921,8 @@ func (tr *TableRestore) importKV(
) error {
task := closedEngine.Logger().Begin(zap.InfoLevel, "import and cleanup engine")
regionSplitSize := int64(rc.cfg.TikvImporter.RegionSplitSize)
regionSplitKeys := int64(rc.cfg.TikvImporter.RegionSplitKeys)
if regionSplitSize == 0 && rc.taskMgr != nil {
regionSplitSize = int64(config.SplitRegionSize)
if err := rc.taskMgr.CheckTasksExclusively(ctx, func(tasks []taskMeta) ([]taskMeta, error) {
@ -932,7 +934,14 @@ func (tr *TableRestore) importKV(
return errors.Trace(err)
}
}
err := closedEngine.Import(ctx, regionSplitSize)
if regionSplitKeys == 0 {
if regionSplitSize > int64(config.SplitRegionSize) {
regionSplitKeys = int64(float64(regionSplitSize) / float64(config.SplitRegionSize) * float64(config.SplitRegionKeys))
} else {
regionSplitKeys = int64(config.SplitRegionKeys)
}
}
err := closedEngine.Import(ctx, regionSplitSize, regionSplitKeys)
saveCpErr := rc.saveStatusCheckpoint(ctx, tr.tableName, engineID, err, checkpoints.CheckpointStatusImported)
// Don't clean up when save checkpoint failed, because we will verifyLocalFile and import engine again after restart.
if err == nil && saveCpErr == nil {

View File

@ -831,7 +831,7 @@ func (s *tableRestoreSuite) TestImportKVSuccess() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(nil)
mockBackend.EXPECT().
CleanupEngine(ctx, engineUUID).
@ -866,7 +866,7 @@ func (s *tableRestoreSuite) TestImportKVFailure() {
CloseEngine(ctx, nil, engineUUID).
Return(nil)
mockBackend.EXPECT().
ImportEngine(ctx, engineUUID, gomock.Any()).
ImportEngine(ctx, engineUUID, gomock.Any(), gomock.Any()).
Return(errors.Annotate(context.Canceled, "fake import error"))
closedEngine, err := importer.UnsafeCloseEngineWithUUID(ctx, nil, "tag", engineUUID)

View File

@ -185,17 +185,17 @@ func (mr *MockBackendMockRecorder) FlushEngine(arg0, arg1 interface{}) *gomock.C
}
// ImportEngine mocks base method.
func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID, arg2 int64) error {
func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID, arg2, arg3 int64) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1, arg2)
ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error)
return ret0
}
// ImportEngine indicates an expected call of ImportEngine.
func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1, arg2 interface{}) *gomock.Call {
func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1, arg2)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1, arg2, arg3)
}
// LocalWriter mocks base method.

View File

@ -68,7 +68,7 @@ func InitMetricsVector(labels prometheus.Labels) {
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Help: "Bucketed histogram of receiving time (s) of chunks",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, labelNames)
errorCount = prometheus.NewCounterVec(