domain, session: add plan replayer gc (#28799)
This commit is contained in:
@ -429,6 +429,7 @@ type Performance struct {
|
||||
MaxTxnTTL uint64 `toml:"max-txn-ttl" json:"max-txn-ttl"`
|
||||
MemProfileInterval string `toml:"mem-profile-interval" json:"mem-profile-interval"`
|
||||
IndexUsageSyncLease string `toml:"index-usage-sync-lease" json:"index-usage-sync-lease"`
|
||||
PlanReplayerGCLease string `toml:"plan-replayer-gc-lease" json:"plan-replayer-gc-lease"`
|
||||
GOGC int `toml:"gogc" json:"gogc"`
|
||||
EnforceMPP bool `toml:"enforce-mpp" json:"enforce-mpp"`
|
||||
}
|
||||
@ -641,6 +642,7 @@ var defaultConf = Config{
|
||||
IndexUsageSyncLease: "0s",
|
||||
GOGC: 100,
|
||||
EnforceMPP: false,
|
||||
PlanReplayerGCLease: "10m",
|
||||
},
|
||||
ProxyProtocol: ProxyProtocol{
|
||||
Networks: "",
|
||||
|
||||
@ -86,6 +86,7 @@ type Domain struct {
|
||||
statsUpdating sync2.AtomicInt32
|
||||
cancel context.CancelFunc
|
||||
indexUsageSyncLease time.Duration
|
||||
planReplayer *planReplayer
|
||||
|
||||
serverID uint64
|
||||
serverIDSession *concurrency.Session
|
||||
@ -652,7 +653,7 @@ func (do *Domain) Close() {
|
||||
const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout
|
||||
|
||||
// NewDomain creates a new domain. Should not create multiple domains for the same store.
|
||||
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, factory pools.Factory, onClose func()) *Domain {
|
||||
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, planReplayerGCLease time.Duration, factory pools.Factory, onClose func()) *Domain {
|
||||
capacity := 200 // capacity of the sysSessionPool size
|
||||
do := &Domain{
|
||||
store: store,
|
||||
@ -662,6 +663,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
|
||||
infoCache: infoschema.NewCache(16),
|
||||
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
|
||||
indexUsageSyncLease: idxUsageSyncLease,
|
||||
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
|
||||
onClose: onClose,
|
||||
}
|
||||
|
||||
@ -1140,6 +1142,28 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
|
||||
}()
|
||||
}
|
||||
|
||||
// PlanReplayerLoop creates a goroutine that handles `exit` and `gc`.
|
||||
func (do *Domain) PlanReplayerLoop(ctx sessionctx.Context) {
|
||||
do.wg.Add(1)
|
||||
go func() {
|
||||
gcTicker := time.NewTicker(do.planReplayer.planReplayerGCLease)
|
||||
defer func() {
|
||||
gcTicker.Stop()
|
||||
do.wg.Done()
|
||||
logutil.BgLogger().Info("PlanReplayerLoop exited.")
|
||||
util.Recover(metrics.LabelDomain, "PlanReplayerLoop", nil, false)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-do.exit:
|
||||
return
|
||||
case <-gcTicker.C:
|
||||
do.planReplayer.planReplayerGC(time.Hour)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// StatsHandle returns the statistic handle.
|
||||
func (do *Domain) StatsHandle() *handle.Handle {
|
||||
return (*handle.Handle)(atomic.LoadPointer(&do.statsHandle))
|
||||
|
||||
@ -67,7 +67,7 @@ func SubTestInfo(t *testing.T) {
|
||||
Storage: s,
|
||||
pdAddrs: []string{cluster.Members[0].GRPCAddr()}}
|
||||
ddlLease := 80 * time.Millisecond
|
||||
dom := NewDomain(mockStore, ddlLease, 0, 0, mockFactory, nil)
|
||||
dom := NewDomain(mockStore, ddlLease, 0, 0, 0, mockFactory, nil)
|
||||
defer func() {
|
||||
dom.Close()
|
||||
err := s.Close()
|
||||
@ -160,7 +160,7 @@ func SubTestDomain(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
|
||||
ddlLease := 80 * time.Millisecond
|
||||
dom := NewDomain(store, ddlLease, 0, 0, mockFactory, nil)
|
||||
dom := NewDomain(store, ddlLease, 0, 0, 0, mockFactory, nil)
|
||||
err = dom.Init(ddlLease, sysMockFactory)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
||||
87
domain/plan_replayer.go
Normal file
87
domain/plan_replayer.go
Normal file
@ -0,0 +1,87 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package domain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type planReplayer struct {
|
||||
sync.Mutex
|
||||
planReplayerGCLease time.Duration
|
||||
}
|
||||
|
||||
// GetPlanReplayerDirName returns plan replayer directory path.
|
||||
// The path is related to the process id.
|
||||
func GetPlanReplayerDirName() string {
|
||||
return filepath.Join(os.TempDir(), "replayer", strconv.Itoa(os.Getpid()))
|
||||
}
|
||||
|
||||
func parseTime(s string) (time.Time, error) {
|
||||
startIdx := strings.LastIndex(s, "_")
|
||||
if startIdx == -1 {
|
||||
return time.Time{}, errors.New("failed to parse the file :" + s)
|
||||
}
|
||||
endIdx := strings.LastIndex(s, ".")
|
||||
if endIdx == -1 || endIdx <= startIdx+1 {
|
||||
return time.Time{}, errors.New("failed to parse the file :" + s)
|
||||
}
|
||||
i, err := strconv.ParseInt(s[startIdx+1:endIdx], 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, errors.New("failed to parse the file :" + s)
|
||||
}
|
||||
return time.Unix(0, i), nil
|
||||
}
|
||||
|
||||
func (p *planReplayer) planReplayerGC(t time.Duration) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
path := GetPlanReplayerDirName()
|
||||
files, err := ioutil.ReadDir(path)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
logutil.BgLogger().Warn("[PlanReplayer] open plan replayer directory failed", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
gcTime := time.Now().Add(-t)
|
||||
for _, f := range files {
|
||||
createTime, err := parseTime(f.Name())
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("[PlanReplayer] parseTime failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if !createTime.After(gcTime) {
|
||||
err := os.Remove(filepath.Join(path, f.Name()))
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("[PlanReplayer] remove file failed", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
logutil.BgLogger().Info(fmt.Sprintf("[PlanReplayer] GC %s", f.Name()))
|
||||
}
|
||||
}
|
||||
}
|
||||
60
domain/plan_replayer_test.go
Normal file
60
domain/plan_replayer_test.go
Normal file
@ -0,0 +1,60 @@
|
||||
// Copyright 2021 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package domain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPlanReplayerGC(t *testing.T) {
|
||||
startTime := time.Now()
|
||||
time := startTime.UnixNano()
|
||||
fileName := fmt.Sprintf("replayer_single_xxxxxx_%v.zip", time)
|
||||
err := os.MkdirAll(GetPlanReplayerDirName(), os.ModePerm)
|
||||
require.Nil(t, err)
|
||||
path := filepath.Join(GetPlanReplayerDirName(), fileName)
|
||||
zf, err := os.Create(path)
|
||||
require.Nil(t, err)
|
||||
zf.Close()
|
||||
|
||||
handler := &planReplayer{}
|
||||
handler.planReplayerGC(0)
|
||||
|
||||
_, err = os.Stat(path)
|
||||
require.NotNil(t, err)
|
||||
require.True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
func TestPlanReplayerParseTime(t *testing.T) {
|
||||
nowTime := time.Now()
|
||||
name1 := fmt.Sprintf("replayer_single_xxxxxx_%v.zip", nowTime.UnixNano())
|
||||
pt, err := parseTime(name1)
|
||||
require.Nil(t, err)
|
||||
require.True(t, pt.Equal(nowTime))
|
||||
|
||||
name2 := fmt.Sprintf("replayer_single_xxxxxx_%v1.zip", nowTime.UnixNano())
|
||||
_, err = parseTime(name2)
|
||||
require.NotNil(t, err)
|
||||
|
||||
name3 := fmt.Sprintf("replayer_single_xxxxxx_%v._zip", nowTime.UnixNano())
|
||||
_, err = parseTime(name3)
|
||||
require.NotNil(t, err)
|
||||
}
|
||||
@ -2552,6 +2552,13 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
se8, err := createSession(store)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dom.PlanReplayerLoop(se8)
|
||||
|
||||
if raw, ok := store.(kv.EtcdBackend); ok {
|
||||
err = raw.StartGCWorker()
|
||||
if err != nil {
|
||||
|
||||
@ -67,6 +67,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
|
||||
ddlLease := time.Duration(atomic.LoadInt64(&schemaLease))
|
||||
statisticLease := time.Duration(atomic.LoadInt64(&statsLease))
|
||||
idxUsageSyncLease := GetIndexUsageSyncLease()
|
||||
planReplayerGCLease := GetPlanReplayerGCLease()
|
||||
err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) {
|
||||
logutil.BgLogger().Info("new domain",
|
||||
zap.String("store", store.UUID()),
|
||||
@ -78,7 +79,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
|
||||
onClose := func() {
|
||||
dm.Delete(store)
|
||||
}
|
||||
d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, factory, onClose)
|
||||
d = domain.NewDomain(store, ddlLease, statisticLease, idxUsageSyncLease, planReplayerGCLease, factory, onClose)
|
||||
err1 = d.Init(ddlLease, sysFactory)
|
||||
if err1 != nil {
|
||||
// If we don't clean it, there are some dirty data when retrying the function of Init.
|
||||
@ -125,6 +126,9 @@ var (
|
||||
// Because we have not completed GC and other functions, we set it to 0.
|
||||
// TODO: Set indexUsageSyncLease to 60s.
|
||||
indexUsageSyncLease = int64(0 * time.Second)
|
||||
|
||||
// planReplayerGCLease is the time for plan replayer gc.
|
||||
planReplayerGCLease = int64(10 * time.Minute)
|
||||
)
|
||||
|
||||
// ResetStoreForWithTiKVTest is only used in the test code.
|
||||
@ -170,6 +174,16 @@ func GetIndexUsageSyncLease() time.Duration {
|
||||
return time.Duration(atomic.LoadInt64(&indexUsageSyncLease))
|
||||
}
|
||||
|
||||
// SetPlanReplayerGCLease changes the default plan repalyer gc lease time.
|
||||
func SetPlanReplayerGCLease(lease time.Duration) {
|
||||
atomic.StoreInt64(&planReplayerGCLease, int64(lease))
|
||||
}
|
||||
|
||||
// GetPlanReplayerGCLease returns the plan replayer gc lease time.
|
||||
func GetPlanReplayerGCLease() time.Duration {
|
||||
return time.Duration(atomic.LoadInt64(&planReplayerGCLease))
|
||||
}
|
||||
|
||||
// DisableStats4Test disables the stats for tests.
|
||||
func DisableStats4Test() {
|
||||
SetStatsLease(-1)
|
||||
|
||||
@ -527,6 +527,8 @@ func setGlobalVars() {
|
||||
session.SetStatsLease(statsLeaseDuration)
|
||||
indexUsageSyncLeaseDuration := parseDuration(cfg.Performance.IndexUsageSyncLease)
|
||||
session.SetIndexUsageSyncLease(indexUsageSyncLeaseDuration)
|
||||
planReplayerGCLease := parseDuration(cfg.Performance.PlanReplayerGCLease)
|
||||
session.SetPlanReplayerGCLease(planReplayerGCLease)
|
||||
bindinfo.Lease = parseDuration(cfg.Performance.BindInfoLease)
|
||||
domain.RunAutoAnalyze = cfg.Performance.RunAutoAnalyze
|
||||
statistics.FeedbackProbability.Store(cfg.Performance.FeedbackProbability)
|
||||
|
||||
Reference in New Issue
Block a user