domain: use safe interface to run internal SQLs when GC plan replayer dump files (#43648)
close pingcap/tidb#43622
This commit is contained in:
@ -1964,6 +1964,11 @@ func (do *Domain) GetExtractHandle() *ExtractHandle {
|
||||
return do.extractTaskHandle
|
||||
}
|
||||
|
||||
// GetDumpFileGCChecker returns dump file GC checker for plan replayer and plan trace
|
||||
func (do *Domain) GetDumpFileGCChecker() *dumpFileGcChecker {
|
||||
return do.dumpFileGcChecker
|
||||
}
|
||||
|
||||
// DumpFileGcCheckerLoop creates a goroutine that handles `exit` and `gc`.
|
||||
func (do *Domain) DumpFileGcCheckerLoop() {
|
||||
do.wg.Run(func() {
|
||||
@ -1979,7 +1984,7 @@ func (do *Domain) DumpFileGcCheckerLoop() {
|
||||
case <-do.exit:
|
||||
return
|
||||
case <-gcTicker.C:
|
||||
do.dumpFileGcChecker.gcDumpFiles(time.Hour, time.Hour*24*7)
|
||||
do.dumpFileGcChecker.GCDumpFiles(time.Hour, time.Hour*24*7)
|
||||
}
|
||||
}
|
||||
}, "dumpFileGcChecker")
|
||||
|
||||
@ -73,7 +73,8 @@ func parseTime(s string) (time.Time, error) {
|
||||
return time.Unix(0, i), nil
|
||||
}
|
||||
|
||||
func (p *dumpFileGcChecker) gcDumpFiles(gcDurationDefault, gcDurationForCapture time.Duration) {
|
||||
// GCDumpFiles periodically cleans the outdated files for plan replayer and plan trace.
|
||||
func (p *dumpFileGcChecker) GCDumpFiles(gcDurationDefault, gcDurationForCapture time.Duration) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
for _, path := range p.paths {
|
||||
@ -127,8 +128,8 @@ func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, gcDurationDefault, gc
|
||||
|
||||
func deletePlanReplayerStatus(ctx context.Context, sctx sessionctx.Context, token string) {
|
||||
ctx1 := kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
|
||||
exec := sctx.(sqlexec.SQLExecutor)
|
||||
_, err := exec.ExecuteInternal(ctx1, fmt.Sprintf("delete from mysql.plan_replayer_status where token = %v", token))
|
||||
exec := sctx.(sqlexec.RestrictedSQLExecutor)
|
||||
_, _, err := exec.ExecRestrictedSQL(ctx1, nil, "delete from mysql.plan_replayer_status where token = %?", token)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("delete mysql.plan_replayer_status record failed", zap.String("token", token), zap.Error(err))
|
||||
}
|
||||
|
||||
@ -16,9 +16,13 @@ package domain_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/tidb/testkit"
|
||||
"github.com/pingcap/tidb/util/replayer"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -119,3 +123,27 @@ func TestPlanReplayerHandleDumpTask(t *testing.T) {
|
||||
// assert capture * task still remained
|
||||
require.Len(t, prHandle.GetTasks(), 1)
|
||||
}
|
||||
|
||||
func TestPlanReplayerGC(t *testing.T) {
|
||||
store, dom := testkit.CreateMockStoreAndDomain(t)
|
||||
tk := testkit.NewTestKit(t, store)
|
||||
handler := dom.GetDumpFileGCChecker()
|
||||
|
||||
startTime := time.Now()
|
||||
time := startTime.UnixNano()
|
||||
fileName := fmt.Sprintf("replayer_single_xxxxxx_%v.zip", time)
|
||||
err := os.MkdirAll(replayer.GetPlanReplayerDirName(), os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
tk.MustExec("insert into mysql.plan_replayer_status(sql_digest, plan_digest, token, instance) values" +
|
||||
"('123','123','" + fileName + "','123')")
|
||||
path := filepath.Join(replayer.GetPlanReplayerDirName(), fileName)
|
||||
zf, err := os.Create(path)
|
||||
require.NoError(t, err)
|
||||
zf.Close()
|
||||
handler.GCDumpFiles(0, 0)
|
||||
tk.MustQuery("select count(*) from mysql.plan_replayer_status").Check(testkit.Rows("0"))
|
||||
|
||||
_, err = os.Stat(path)
|
||||
require.NotNil(t, err)
|
||||
require.True(t, os.IsNotExist(err))
|
||||
}
|
||||
|
||||
@ -16,7 +16,6 @@ package domain
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
@ -26,24 +25,6 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestPlanReplayerGC(t *testing.T) {
|
||||
startTime := time.Now()
|
||||
time := startTime.UnixNano()
|
||||
fileName := fmt.Sprintf("replayer_single_xxxxxx_%v.zip", time)
|
||||
err := os.MkdirAll(replayer.GetPlanReplayerDirName(), os.ModePerm)
|
||||
require.NoError(t, err)
|
||||
path := filepath.Join(replayer.GetPlanReplayerDirName(), fileName)
|
||||
zf, err := os.Create(path)
|
||||
require.NoError(t, err)
|
||||
zf.Close()
|
||||
|
||||
handler := &dumpFileGcChecker{
|
||||
paths: []string{replayer.GetPlanReplayerDirName()},
|
||||
}
|
||||
handler.gcDumpFiles(0, 0)
|
||||
require.NoFileExists(t, path)
|
||||
}
|
||||
|
||||
func TestPlanReplayerDifferentGC(t *testing.T) {
|
||||
dirName := replayer.GetPlanReplayerDirName()
|
||||
|
||||
@ -82,13 +63,13 @@ func TestPlanReplayerDifferentGC(t *testing.T) {
|
||||
handler := &dumpFileGcChecker{
|
||||
paths: []string{dirName},
|
||||
}
|
||||
handler.gcDumpFiles(time.Hour, time.Hour*24*7)
|
||||
handler.GCDumpFiles(time.Hour, time.Hour*24*7)
|
||||
require.NoFileExists(t, filePath1)
|
||||
require.FileExists(t, filePath2)
|
||||
require.NoFileExists(t, filePath3)
|
||||
require.FileExists(t, filePath4)
|
||||
|
||||
handler.gcDumpFiles(0, 0)
|
||||
handler.GCDumpFiles(0, 0)
|
||||
require.NoFileExists(t, filePath2)
|
||||
require.NoFileExists(t, filePath4)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user