From cfcc770e8a0451c9786fa288b910b784e36633f2 Mon Sep 17 00:00:00 2001 From: Weizhen Wang Date: Mon, 22 Apr 2024 19:21:09 +0800 Subject: [PATCH] statistics: add upper bound of retry for sync load (#52658) close pingcap/tidb#52657 --- pkg/statistics/handle/syncload/BUILD.bazel | 4 +- .../handle/syncload/stats_syncload.go | 68 ++++++++------ .../handle/syncload/stats_syncload_test.go | 91 +++++++++++++++++++ pkg/statistics/handle/types/interfaces.go | 1 + 4 files changed, 137 insertions(+), 27 deletions(-) diff --git a/pkg/statistics/handle/syncload/BUILD.bazel b/pkg/statistics/handle/syncload/BUILD.bazel index 365fadbfeb..ed6e310786 100644 --- a/pkg/statistics/handle/syncload/BUILD.bazel +++ b/pkg/statistics/handle/syncload/BUILD.bazel @@ -30,12 +30,14 @@ go_test( srcs = ["stats_syncload_test.go"], flaky = True, race = "on", - shard_count = 4, + shard_count = 5, deps = [ + ":syncload", "//pkg/config", "//pkg/parser/model", "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", + "//pkg/statistics/handle/types", "//pkg/testkit", "//pkg/util/mathutil", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/statistics/handle/syncload/stats_syncload.go b/pkg/statistics/handle/syncload/stats_syncload.go index 3e9db77161..723a2d77b3 100644 --- a/pkg/statistics/handle/syncload/stats_syncload.go +++ b/pkg/statistics/handle/syncload/stats_syncload.go @@ -36,6 +36,9 @@ import ( "go.uber.org/zap" ) +// RetryCount is the max retry count for a sync load task. +const RetryCount = 3 + type statsSyncLoad struct { statsHandle statstypes.StatsHandle StatsLoad statstypes.StatsLoad @@ -204,6 +207,9 @@ func (s *statsSyncLoad) SubLoadWorker(sctx sessionctx.Context, exit chan struct{ } // HandleOneTask handles last task if not nil, else handle a new task from chan, and return current task if fail somewhere. +// - If the task is handled successfully, return nil, nil. +// - If the task is timeout, return the task and nil. The caller should retry the timeout task without sleep. +// - If the task is failed, return the task, error. The caller should retry the timeout task with sleep. func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statstypes.NeededItemTask, exit chan struct{}) (task *statstypes.NeededItemTask, err error) { defer func() { // recover for each task, worker keeps working @@ -223,28 +229,42 @@ func (s *statsSyncLoad) HandleOneTask(sctx sessionctx.Context, lastTask *statsty } else { task = lastTask } + result := stmtctx.StatsLoadResult{Item: task.Item.TableItemID} resultChan := s.StatsLoad.Singleflight.DoChan(task.Item.Key(), func() (any, error) { - return s.handleOneItemTask(sctx, task) + err := s.handleOneItemTask(sctx, task) + return nil, err }) timeout := time.Until(task.ToTimeout) select { - case result := <-resultChan: - if result.Err == nil { - slr := result.Val.(*stmtctx.StatsLoadResult) - if slr.Error != nil { - return task, slr.Error - } - task.ResultCh <- *slr + case sr := <-resultChan: + // sr.Val is always nil. + if sr.Err == nil { + task.ResultCh <- result return nil, nil } - return task, result.Err + if !isVaildForRetry(task) { + result.Error = sr.Err + task.ResultCh <- result + return nil, nil + } + return task, sr.Err case <-time.After(timeout): + if !isVaildForRetry(task) { + result.Error = errors.New("stats loading timeout") + task.ResultCh <- result + return nil, nil + } task.ToTimeout.Add(time.Duration(sctx.GetSessionVars().StatsLoadSyncWait.Load()) * time.Microsecond) return task, nil } } -func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (result *stmtctx.StatsLoadResult, err error) { +func isVaildForRetry(task *statstypes.NeededItemTask) bool { + task.Retry++ + return task.Retry <= RetryCount +} + +func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statstypes.NeededItemTask) (err error) { defer func() { // recover for each task, worker keeps working if r := recover(); r != nil { @@ -252,17 +272,16 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty err = errors.Errorf("stats loading panicked: %v", r) } }() - result = &stmtctx.StatsLoadResult{Item: task.Item.TableItemID} - item := result.Item + item := task.Item.TableItemID tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - return result, nil + return nil } wrapper := &statsWrapper{} if item.IsIndex { index, loadNeeded := tbl.IndexIsLoadNeeded(item.ID) if !loadNeeded { - return result, nil + return nil } if index != nil { wrapper.idxInfo = index.Info @@ -272,7 +291,7 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } else { col, loadNeeded, analyzed := tbl.ColumnIsLoadNeeded(item.ID, task.Item.FullLoad) if !loadNeeded { - return result, nil + return nil } if col != nil { wrapper.colInfo = col.Info @@ -288,18 +307,15 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty Histogram: *statistics.NewHistogram(item.ID, 0, 0, 0, &wrapper.colInfo.FieldType, 0, 0), IsHandle: tbl.IsPkIsHandle && mysql.HasPriKeyFlag(wrapper.colInfo.GetFlag()), } - if s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { - return result, nil - } - return nil, nil + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) + return nil } } t := time.Now() needUpdate := false wrapper, err = s.readStatsForOneItem(sctx, item, wrapper, tbl.IsPkIsHandle, task.Item.FullLoad) if err != nil { - result.Error = err - return result, err + return err } if item.IsIndex { if wrapper.idxInfo != nil { @@ -311,10 +327,10 @@ func (s *statsSyncLoad) handleOneItemTask(sctx sessionctx.Context, task *statsty } } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) - if needUpdate && s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) { - return result, nil + if needUpdate { + s.updateCachedItem(item, wrapper.col, wrapper.idx, task.Item.FullLoad) } - return nil, nil + return nil } // readStatsForOneItem reads hist for one column/index, TODO load data via kv-get asynchronously @@ -492,14 +508,14 @@ func (s *statsSyncLoad) updateCachedItem(item model.TableItemID, colHist *statis // like `GetPartitionStats` called in `fmSketchFromStorage` would have modified the stats cache already. tbl, ok := s.statsHandle.Get(item.TableID) if !ok { - return true + return false } if !item.IsIndex && colHist != nil { c, ok := tbl.Columns[item.ID] // - If the stats is fully loaded, // - If the stats is meta-loaded and we also just need the meta. if ok && (c.IsFullLoad() || !fullLoaded) { - return true + return false } tbl = tbl.Copy() tbl.Columns[item.ID] = colHist diff --git a/pkg/statistics/handle/syncload/stats_syncload_test.go b/pkg/statistics/handle/syncload/stats_syncload_test.go index 01706d668d..e20b615b7e 100644 --- a/pkg/statistics/handle/syncload/stats_syncload_test.go +++ b/pkg/statistics/handle/syncload/stats_syncload_test.go @@ -23,6 +23,8 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" + "github.com/pingcap/tidb/pkg/statistics/handle/syncload" + "github.com/pingcap/tidb/pkg/statistics/handle/types" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/mathutil" "github.com/stretchr/testify/require" @@ -206,6 +208,18 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { task1, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), nil, exitCh) require.Error(t, err1) require.NotNil(t, task1) + select { + case <-stmtCtx1.StatsLoad.ResultCh: + t.Logf("stmtCtx1.ResultCh should not get anything") + t.FailNow() + case <-stmtCtx2.StatsLoad.ResultCh: + t.Logf("stmtCtx2.ResultCh should not get anything") + t.FailNow() + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get anything") + t.FailNow() + default: + } require.NoError(t, failpoint.Disable(fp.failPath)) task3, err3 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) @@ -229,3 +243,80 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { require.Greater(t, hg.Len()+topn.Num(), 0) } } + +func TestRetry(t *testing.T) { + originConfig := config.GetGlobalConfig() + newConfig := config.NewConfig() + newConfig.Performance.StatsLoadConcurrency = 0 // no worker to consume channel + config.StoreGlobalConfig(newConfig) + defer config.StoreGlobalConfig(originConfig) + store, dom := testkit.CreateMockStoreAndDomain(t) + + testKit := testkit.NewTestKit(t, store) + testKit.MustExec("use test") + testKit.MustExec("drop table if exists t") + testKit.MustExec("set @@session.tidb_analyze_version=2") + testKit.MustExec("create table t(a int, b int, c int, primary key(a), key idx(b))") + testKit.MustExec("insert into t values (1,1,1),(2,2,2),(3,3,3)") + + oriLease := dom.StatsHandle().Lease() + dom.StatsHandle().SetLease(1) + defer func() { + dom.StatsHandle().SetLease(oriLease) + }() + testKit.MustExec("analyze table t") + + is := dom.InfoSchema() + tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t")) + require.NoError(t, err) + tableInfo := tbl.Meta() + + h := dom.StatsHandle() + + neededColumns := make([]model.StatsLoadItem, 1) + neededColumns[0] = model.StatsLoadItem{TableItemID: model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[2].ID, IsIndex: false}, FullLoad: true} + timeout := time.Nanosecond * mathutil.MaxInt + + // clear statsCache + h.Clear() + require.NoError(t, dom.StatsHandle().Update(is)) + + // no stats at beginning + stat := h.GetTableStats(tableInfo) + c, ok := stat.Columns[tableInfo.Columns[2].ID] + require.True(t, !ok || (c.Histogram.Len()+c.TopN.Num() == 0)) + + stmtCtx1 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx1, neededColumns, timeout) + stmtCtx2 := stmtctx.NewStmtCtx() + h.SendLoadRequests(stmtCtx2, neededColumns, timeout) + + exitCh := make(chan struct{}) + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail", "return(true)")) + var ( + task1 *types.NeededItemTask + err1 error + ) + + for i := 0; i < syncload.RetryCount; i++ { + task1, err1 = h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.Error(t, err1) + require.NotNil(t, task1) + select { + case <-task1.ResultCh: + t.Logf("task1.ResultCh should not get nothing") + t.FailNow() + default: + } + } + result, err1 := h.HandleOneTask(testKit.Session().(sessionctx.Context), task1, exitCh) + require.NoError(t, err1) + require.Nil(t, result) + select { + case <-task1.ResultCh: + default: + t.Logf("task1.ResultCh should get nothing") + t.FailNow() + } + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/statistics/handle/syncload/mockReadStatsForOneFail")) +} diff --git a/pkg/statistics/handle/types/interfaces.go b/pkg/statistics/handle/types/interfaces.go index d3a4ee7263..5c1b41d7fb 100644 --- a/pkg/statistics/handle/types/interfaces.go +++ b/pkg/statistics/handle/types/interfaces.go @@ -370,6 +370,7 @@ type NeededItemTask struct { ToTimeout time.Time ResultCh chan stmtctx.StatsLoadResult Item model.StatsLoadItem + Retry int } // StatsLoad is used to load stats concurrently