From fa82cd47ec5baa09dc0b712d793decf29dd80deb Mon Sep 17 00:00:00 2001 From: rebelice Date: Fri, 15 Oct 2021 11:13:27 +0800 Subject: [PATCH] domain, session: add plan replayer gc (#28799) --- config/config.go | 2 + domain/domain.go | 26 ++++++++++- domain/domain_test.go | 4 +- domain/plan_replayer.go | 87 ++++++++++++++++++++++++++++++++++++ domain/plan_replayer_test.go | 60 +++++++++++++++++++++++++ session/session.go | 7 +++ session/tidb.go | 16 ++++++- tidb-server/main.go | 2 + 8 files changed, 200 insertions(+), 4 deletions(-) create mode 100644 domain/plan_replayer.go create mode 100644 domain/plan_replayer_test.go diff --git a/config/config.go b/config/config.go index 2ad654f6fb..9b1a8ca11c 100644 --- a/config/config.go +++ b/config/config.go @@ -429,6 +429,7 @@ type Performance struct { MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"` MemProfileInterval string `toml:"mem-profile-interval" json:"mem-profile-interval"` IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"` + PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"` GOGC int `toml:"gogc" json:"gogc"` EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"` } @@ -641,6 +642,7 @@ var defaultConf = Config{ IndexUsageSyncLease: "0s", GOGC: 100, EnforceMPP: false, + PlanReplayerGCLease: "10m", }, ProxyProtocol: ProxyProtocol{ Networks: "", diff --git a/domain/domain.go b/domain/domain.go index 5cc8ef1117..1e206175a2 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -86,6 +86,7 @@ type Domain struct { statsUpdating sync2.AtomicInt32 cancel context.CancelFunc indexUsageSyncLease time.Duration + planReplayer *planReplayer serverID uint64 serverIDSession *concurrency.Session @@ -652,7 +653,7 @@ func (do *Domain) Close() { const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout // NewDomain creates a new domain. Should not create multiple domains for the same store. -func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, factory pools.Factory, onClose func()) *Domain { +func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, planReplayerGCLease time.Duration, factory pools.Factory, onClose func()) *Domain { capacity := 200 // capacity of the sysSessionPool size do := &Domain{ store: store, @@ -662,6 +663,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio infoCache: infoschema.NewCache(16), slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500), indexUsageSyncLease: idxUsageSyncLease, + planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, } @@ -1140,6 +1142,28 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) { }() } +// PlanReplayerLoop creates a goroutine that handles `exit` and `gc`. +func (do *Domain) PlanReplayerLoop(ctx sessionctx.Context) { + do.wg.Add(1) + go func() { + gcTicker := time.NewTicker(do.planReplayer.planReplayerGCLease) + defer func() { + gcTicker.Stop() + do.wg.Done() + logutil.BgLogger().Info("PlanReplayerLoop exited.") + util.Recover(metrics.LabelDomain, "PlanReplayerLoop", nil, false) + }() + for { + select { + case <-do.exit: + return + case <-gcTicker.C: + do.planReplayer.planReplayerGC(time.Hour) + } + } + }() +} + // StatsHandle returns the statistic handle. func (do *Domain) StatsHandle() *handle.Handle { return (*handle.Handle)(atomic.LoadPointer(&do.statsHandle)) diff --git a/domain/domain_test.go b/domain/domain_test.go index 1285ee3a10..1ba21edd91 100644 --- a/domain/domain_test.go +++ b/domain/domain_test.go @@ -67,7 +67,7 @@ func SubTestInfo(t *testing.T) { Storage: s, pdAddrs: []string{cluster.Members[0].GRPCAddr()}} ddlLease := 80 * time.Millisecond - dom := NewDomain(mockStore, ddlLease, 0, 0, mockFactory, nil) + dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory, nil) defer func() { dom.Close() err := s.Close() @@ -160,7 +160,7 @@ func SubTestDomain(t *testing.T) { require.NoError(t, err) ddlLease := 80 * time.Millisecond - dom := NewDomain(store, ddlLease, 0, 0, mockFactory, nil) + dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil) err = dom.Init(ddlLease, sysMockFactory) require.NoError(t, err) diff --git a/domain/plan_replayer.go b/domain/plan_replayer.go new file mode 100644 index 0000000000..37fbbc5047 --- /dev/null +++ b/domain/plan_replayer.go @@ -0,0 +1,87 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" +) + +type planReplayer struct { + sync.Mutex + planReplayerGCLease time.Duration +} + +// GetPlanReplayerDirName returns plan replayer directory path. +// The path is related to the process id. +func GetPlanReplayerDirName() string { + return filepath.Join(os.TempDir(), "replayer", strconv.Itoa(os.Getpid())) +} + +func parseTime(s string) (time.Time, error) { + startIdx := strings.LastIndex(s, "_") + if startIdx == -1 { + return time.Time{}, errors.New("failed to parse the file :" + s) + } + endIdx := strings.LastIndex(s, ".") + if endIdx == -1 || endIdx <= startIdx+1 { + return time.Time{}, errors.New("failed to parse the file :" + s) + } + i, err := strconv.ParseInt(s[startIdx+1:endIdx], 10, 64) + if err != nil { + return time.Time{}, errors.New("failed to parse the file :" + s) + } + return time.Unix(0, i), nil +} + +func (p *planReplayer) planReplayerGC(t time.Duration) { + p.Lock() + defer p.Unlock() + path := GetPlanReplayerDirName() + files, err := ioutil.ReadDir(path) + if err != nil { + if !os.IsNotExist(err) { + logutil.BgLogger().Warn("[PlanReplayer] open plan replayer directory failed", zap.Error(err)) + } + return + } + + gcTime := time.Now().Add(-t) + for _, f := range files { + createTime, err := parseTime(f.Name()) + if err != nil { + logutil.BgLogger().Warn("[PlanReplayer] parseTime failed", zap.Error(err)) + continue + } + if !createTime.After(gcTime) { + err := os.Remove(filepath.Join(path, f.Name())) + if err != nil { + logutil.BgLogger().Warn("[PlanReplayer] remove file failed", zap.Error(err)) + continue + } + logutil.BgLogger().Info(fmt.Sprintf("[PlanReplayer] GC %s", f.Name())) + } + } +} diff --git a/domain/plan_replayer_test.go b/domain/plan_replayer_test.go new file mode 100644 index 0000000000..f21abfd1fe --- /dev/null +++ b/domain/plan_replayer_test.go @@ -0,0 +1,60 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "fmt" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestPlanReplayerGC(t *testing.T) { + startTime := time.Now() + time := startTime.UnixNano() + fileName := fmt.Sprintf("replayer_single_xxxxxx_%v.zip", time) + err := os.MkdirAll(GetPlanReplayerDirName(), os.ModePerm) + require.Nil(t, err) + path := filepath.Join(GetPlanReplayerDirName(), fileName) + zf, err := os.Create(path) + require.Nil(t, err) + zf.Close() + + handler := &planReplayer{} + handler.planReplayerGC(0) + + _, err = os.Stat(path) + require.NotNil(t, err) + require.True(t, os.IsNotExist(err)) +} + +func TestPlanReplayerParseTime(t *testing.T) { + nowTime := time.Now() + name1 := fmt.Sprintf("replayer_single_xxxxxx_%v.zip", nowTime.UnixNano()) + pt, err := parseTime(name1) + require.Nil(t, err) + require.True(t, pt.Equal(nowTime)) + + name2 := fmt.Sprintf("replayer_single_xxxxxx_%v1.zip", nowTime.UnixNano()) + _, err = parseTime(name2) + require.NotNil(t, err) + + name3 := fmt.Sprintf("replayer_single_xxxxxx_%v._zip", nowTime.UnixNano()) + _, err = parseTime(name3) + require.NotNil(t, err) +} diff --git a/session/session.go b/session/session.go index 10c36393a1..6353b2c001 100644 --- a/session/session.go +++ b/session/session.go @@ -2552,6 +2552,13 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { if err != nil { return nil, err } + + se8, err := createSession(store) + if err != nil { + return nil, err + } + dom.PlanReplayerLoop(se8) + if raw, ok := store.(kv.EtcdBackend); ok { err = raw.StartGCWorker() if err != nil { diff --git a/session/tidb.go b/session/tidb.go index 74bed53754..5e0256efeb 100644 --- a/session/tidb.go +++ b/session/tidb.go @@ -67,6 +67,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { ddlLease := time.Duration(atomic.LoadInt64(&schemaLease)) statisticLease := time.Duration(atomic.LoadInt64(&statsLease)) idxUsageSyncLease := GetIndexUsageSyncLease() + planReplayerGCLease := GetPlanReplayerGCLease() err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) { logutil.BgLogger().Info("new domain", zap.String("store", store.UUID()), @@ -78,7 +79,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) { onClose := func() { dm.Delete(store) } - d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, factory, onClose) + d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose) err1 = d.Init(ddlLease, sysFactory) if err1 != nil { // If we don't clean it, there are some dirty data when retrying the function of Init. @@ -125,6 +126,9 @@ var ( // Because we have not completed GC and other functions, we set it to 0. // TODO: Set indexUsageSyncLease to 60s. indexUsageSyncLease = int64(0 * time.Second) + + // planReplayerGCLease is the time for plan replayer gc. + planReplayerGCLease = int64(10 * time.Minute) ) // ResetStoreForWithTiKVTest is only used in the test code. @@ -170,6 +174,16 @@ func GetIndexUsageSyncLease() time.Duration { return time.Duration(atomic.LoadInt64(&indexUsageSyncLease)) } +// SetPlanReplayerGCLease changes the default plan repalyer gc lease time. +func SetPlanReplayerGCLease(lease time.Duration) { + atomic.StoreInt64(&planReplayerGCLease, int64(lease)) +} + +// GetPlanReplayerGCLease returns the plan replayer gc lease time. +func GetPlanReplayerGCLease() time.Duration { + return time.Duration(atomic.LoadInt64(&planReplayerGCLease)) +} + // DisableStats4Test disables the stats for tests. func DisableStats4Test() { SetStatsLease(-1) diff --git a/tidb-server/main.go b/tidb-server/main.go index 6505d2aeac..ae89bed914 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -527,6 +527,8 @@ func setGlobalVars() { session.SetStatsLease(statsLeaseDuration) indexUsageSyncLeaseDuration := parseDuration(cfg.Performance.IndexUsageSyncLease) session.SetIndexUsageSyncLease(indexUsageSyncLeaseDuration) + planReplayerGCLease := parseDuration(cfg.Performance.PlanReplayerGCLease) + session.SetPlanReplayerGCLease(planReplayerGCLease) bindinfo.Lease = parseDuration(cfg.Performance.BindInfoLease) domain.RunAutoAnalyze = cfg.Performance.RunAutoAnalyze statistics.FeedbackProbability.Store(cfg.Performance.FeedbackProbability)