diff --git a/executor/adapter.go b/executor/adapter.go index 6ee10da05f..4a648f4029 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -1296,7 +1296,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) { ExecRetryCount: a.retryCount, IsExplicitTxn: sessVars.TxnCtx.IsExplicit, IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0, + IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed, } + failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) { + if val.(bool) { + if !slowItems.IsSyncStatsFailed { + panic("isSyncStatsFailed should be true") + } + } + }) if a.retryCount > 0 { slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime) } diff --git a/executor/executor.go b/executor/executor.go index 678eb3d39c..fc177713d7 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1925,6 +1925,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { sc.EnableOptimizeTrace = false sc.OptimizeTracer = nil sc.OptimizerCETrace = nil + sc.IsSyncStatsFailed = false sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow diff --git a/planner/core/plan_stats.go b/planner/core/plan_stats.go index 4fbf5e4772..4a8e38b01a 100644 --- a/planner/core/plan_stats.go +++ b/planner/core/plan_stats.go @@ -25,7 +25,9 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" + "go.uber.org/zap" ) type collectPredicateColumnsPoint struct{} @@ -41,6 +43,9 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl if len(predicateColumns) > 0 { plan.SCtx().UpdateColStatsUsage(predicateColumns) } + if !histNeeded { + return plan, nil + } histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns) histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices) if histNeeded && len(histNeededItems) > 0 { @@ -85,6 +90,8 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI var timeout = time.Duration(waitTime) err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout) if err != nil { + logutil.BgLogger().Warn("SendLoadRequests failed", zap.Error(err)) + stmtCtx.IsSyncStatsFailed = true return handleTimeout(stmtCtx) } return nil @@ -100,6 +107,8 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) { if success { return true, nil } + logutil.BgLogger().Warn("SyncWaitStatsLoad failed") + stmtCtx.IsSyncStatsFailed = true err := handleTimeout(stmtCtx) return false, err } diff --git a/planner/core/plan_stats_test.go b/planner/core/plan_stats_test.go index 84678bc7ca..8f23858cda 100644 --- a/planner/core/plan_stats_test.go +++ b/planner/core/plan_stats_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/parser" @@ -27,7 +28,9 @@ import ( "github.com/pingcap/tidb/planner" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/statistics" + "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/testkit" "github.com/stretchr/testify/require" ) @@ -259,15 +262,26 @@ func TestPlanStatsLoadTimeout(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false} - resultCh := make(chan model.TableItemID, 1) + resultCh := make(chan stmtctx.StatsLoadResult, 1) timeout := time.Duration(1<<63 - 1) - dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full - stmt, err := p.ParseOneStmt("select * from t where c>1", "", "") + task := &handle.NeededItemTask{ + TableItemID: neededColumn, + ResultCh: resultCh, + ToTimeout: time.Now().Local().Add(timeout), + } + dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full + sql := "select * from t where c>1" + stmt, err := p.ParseOneStmt(sql, "", "") require.NoError(t, err) tk.MustExec("set global tidb_stats_load_pseudo_timeout=false") _, _, err = planner.Optimize(context.TODO(), ctx, stmt, is) require.Error(t, err) // fail sql for timeout when pseudo=false + tk.MustExec("set global tidb_stats_load_pseudo_timeout=true") + require.NoError(t, failpoint.Enable("github.com/pingcap/executor/assertSyncStatsFailed", `return(true)`)) + tk.MustExec(sql) // not fail sql for timeout when pseudo=true + failpoint.Disable("github.com/pingcap/executor/assertSyncStatsFailed") + plan, _, err := planner.Optimize(context.TODO(), ctx, stmt, is) require.NoError(t, err) // not fail sql for timeout when pseudo=true switch pp := plan.(type) { diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index a13b80a78a..98f2c6b27a 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -15,6 +15,7 @@ package stmtctx import ( + "bytes" "encoding/json" "math" "strconv" @@ -293,7 +294,7 @@ type StatementContext struct { // NeededItems stores the columns/indices whose stats are needed for planner. NeededItems []model.TableItemID // ResultCh to receive stats loading results - ResultCh chan model.TableItemID + ResultCh chan StatsLoadResult // Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple. Fallback bool // LoadStartTime is to record the load start time to calculate latency @@ -310,6 +311,9 @@ type StatementContext struct { IsSQLRegistered atomic2.Bool // IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL. IsSQLAndPlanRegistered atomic2.Bool + + // IsSyncStatsFailed indicates whether any failure happened during sync stats + IsSyncStatsFailed bool } // StmtHints are SessionVars related sql hints. @@ -1014,3 +1018,30 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) { fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress)) return fields } + +// StatsLoadResult indicates result for StatsLoad +type StatsLoadResult struct { + Item model.TableItemID + Error error +} + +// HasError returns whether result has error +func (r StatsLoadResult) HasError() bool { + return r.Error != nil +} + +// ErrorMsg returns StatsLoadResult err msg +func (r StatsLoadResult) ErrorMsg() string { + if r.Error == nil { + return "" + } + b := bytes.NewBufferString("tableID:") + b.WriteString(strconv.FormatInt(r.Item.TableID, 10)) + b.WriteString(", id:") + b.WriteString(strconv.FormatInt(r.Item.ID, 10)) + b.WriteString(", isIndex:") + b.WriteString(strconv.FormatBool(r.Item.IsIndex)) + b.WriteString(", err:") + b.WriteString(r.Error.Error()) + return b.String() +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a9bdcb8a60..4a03cc3317 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -2526,6 +2526,8 @@ const ( SlowLogIsExplicitTxn = "IsExplicitTxn" // SlowLogIsWriteCacheTable is used to indicate whether writing to the cache table need to wait for the read lock to expire. SlowLogIsWriteCacheTable = "IsWriteCacheTable" + // SlowLogIsSyncStatsFailed is used to indicate whether any failure happen during sync stats + SlowLogIsSyncStatsFailed = "IsSyncStatsFailed" ) // GenerateBinaryPlan decides whether we should record binary plan in slow log and stmt summary. @@ -2568,6 +2570,7 @@ type SlowQueryLogItems struct { ResultRows int64 IsExplicitTxn bool IsWriteCacheTable bool + IsSyncStatsFailed bool } // SlowLogFormat uses for formatting slow log. @@ -2732,6 +2735,7 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string { writeSlowLogItem(&buf, SlowLogResultRows, strconv.FormatInt(logItems.ResultRows, 10)) writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ)) writeSlowLogItem(&buf, SlowLogIsExplicitTxn, strconv.FormatBool(logItems.IsExplicitTxn)) + writeSlowLogItem(&buf, SlowLogIsSyncStatsFailed, strconv.FormatBool(logItems.IsSyncStatsFailed)) if s.StmtCtx.WaitLockLeaseTime > 0 { writeSlowLogItem(&buf, SlowLogIsWriteCacheTable, strconv.FormatBool(logItems.IsWriteCacheTable)) } diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 0776dd6083..5e47a9da45 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -232,6 +232,7 @@ func TestSlowLogFormat(t *testing.T) { # Result_rows: 12345 # Succ: true # IsExplicitTxn: true +# IsSyncStatsFailed: false # IsWriteCacheTable: true` sql := "select * from t;" _, digest := parser.NormalizeDigest(sql) diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index e1bae81b22..dbd38bd774 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -971,7 +971,7 @@ const ( DefTiDBPersistAnalyzeOptions = true DefTiDBEnableColumnTracking = false DefTiDBStatsLoadSyncWait = 0 - DefTiDBStatsLoadPseudoTimeout = false + DefTiDBStatsLoadPseudoTimeout = true DefSysdateIsNow = false DefTiDBEnableMutationChecker = false DefTiDBTxnAssertionLevel = AssertionOffStr diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index fdf592f364..6c5ca30353 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency) handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize) - handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{} + handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{} err := handle.RefreshVars() if err != nil { return nil, err diff --git a/statistics/handle/handle_hist.go b/statistics/handle/handle_hist.go index 44423794fb..8e87308e22 100644 --- a/statistics/handle/handle_hist.go +++ b/statistics/handle/handle_hist.go @@ -46,14 +46,14 @@ type StatsLoad struct { SubCtxs []sessionctx.Context NeededItemsCh chan *NeededItemTask TimeoutItemsCh chan *NeededItemTask - WorkingColMap map[model.TableItemID][]chan model.TableItemID + WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult } // NeededItemTask represents one needed column/indices with expire time. type NeededItemTask struct { TableItemID model.TableItemID ToTimeout time.Time - ResultCh chan model.TableItemID + ResultCh chan stmtctx.StatsLoadResult } // SendLoadRequests send neededColumns requests @@ -64,9 +64,14 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems } sc.StatsLoad.Timeout = timeout sc.StatsLoad.NeededItems = remainedItems - sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems)) - for _, col := range remainedItems { - err := h.AppendNeededItem(col, sc.StatsLoad.ResultCh, timeout) + sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems)) + for _, item := range remainedItems { + task := &NeededItemTask{ + TableItemID: item, + ToTimeout: time.Now().Local().Add(timeout), + ResultCh: sc.StatsLoad.ResultCh, + } + err := h.AppendNeededItem(task, timeout) if err != nil { return err } @@ -80,7 +85,12 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool { if len(sc.StatsLoad.NeededItems) <= 0 { return true } + var errorMsgs []string defer func() { + if len(errorMsgs) > 0 { + logutil.BgLogger().Warn("SyncWaitStatsLoad meets error", + zap.Strings("errors", errorMsgs)) + } sc.StatsLoad.NeededItems = nil }() resultCheckMap := map[model.TableItemID]struct{}{} @@ -94,7 +104,10 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool { select { case result, ok := <-sc.StatsLoad.ResultCh: if ok { - delete(resultCheckMap, result) + if result.HasError() { + errorMsgs = append(errorMsgs, result.ErrorMsg()) + } + delete(resultCheckMap, result.Item) if len(resultCheckMap) == 0 { metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds())) return true @@ -104,6 +117,7 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool { } case <-timer.C: metrics.SyncLoadTimeoutCounter.Inc() + logutil.BgLogger().Warn("SyncWaitStatsLoad timeout") return false } } @@ -134,9 +148,7 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode } // AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one. -func (h *Handle) AppendNeededItem(item model.TableItemID, resultCh chan model.TableItemID, timeout time.Duration) error { - toTimout := time.Now().Local().Add(timeout) - task := &NeededItemTask{TableItemID: item, ToTimeout: toTimout, ResultCh: resultCh} +func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error { return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout) } @@ -202,11 +214,12 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC } func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) { - item := task.TableItemID + result := stmtctx.StatsLoadResult{Item: task.TableItemID} + item := result.Item oldCache := h.statsCache.Load().(statsCache) tbl, ok := oldCache.Get(item.TableID) if !ok { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } var err error @@ -214,22 +227,22 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC if item.IsIndex { index, ok := tbl.Indices[item.ID] if !ok || index.IsFullLoad() { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.idx = index } else { col, ok := tbl.Columns[item.ID] if !ok || col.IsFullLoad() { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } wrapper.col = col } // to avoid duplicated handling in concurrent scenario - working := h.setWorking(task.TableItemID, task.ResultCh) + working := h.setWorking(result.Item, task.ResultCh) if !working { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) return nil, nil } // refresh statsReader to get latest stats @@ -238,6 +251,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC needUpdate := false wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader) if err != nil { + result.Error = err return task, err } if item.IsIndex { @@ -251,9 +265,9 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC } metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds())) if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) { - h.writeToResultChan(task.ResultCh, item) + h.writeToResultChan(task.ResultCh, result) } - h.finishWorking(item) + h.finishWorking(result) return nil, nil } @@ -425,7 +439,7 @@ func (h *Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *Neede } // writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact. -func (h *Handle) writeToResultChan(resultCh chan model.TableItemID, rs model.TableItemID) { +func (h *Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) { defer func() { if r := recover(); r != nil { logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack")) @@ -466,7 +480,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery())) } -func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableItemID) bool { +func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool { h.StatsLoad.Lock() defer h.StatsLoad.Unlock() chList, ok := h.StatsLoad.WorkingColMap[item] @@ -477,20 +491,20 @@ func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableIte h.StatsLoad.WorkingColMap[item] = append(chList, resultCh) return false } - chList = []chan model.TableItemID{} + chList = []chan stmtctx.StatsLoadResult{} chList = append(chList, resultCh) h.StatsLoad.WorkingColMap[item] = chList return true } -func (h *Handle) finishWorking(item model.TableItemID) { +func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) { h.StatsLoad.Lock() defer h.StatsLoad.Unlock() - if chList, ok := h.StatsLoad.WorkingColMap[item]; ok { + if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok { list := chList[1:] for _, ch := range list { - h.writeToResultChan(ch, item) + h.writeToResultChan(ch, result) } } - delete(h.StatsLoad.WorkingColMap, item) + delete(h.StatsLoad.WorkingColMap, result.Item) } diff --git a/statistics/handle/handle_hist_test.go b/statistics/handle/handle_hist_test.go index c3f16503e3..6ac047ac99 100644 --- a/statistics/handle/handle_hist_test.go +++ b/statistics/handle/handle_hist_test.go @@ -221,10 +221,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) { rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh require.True(t, ok1) - require.Equal(t, neededColumns[0], rs1) + require.Equal(t, neededColumns[0], rs1.Item) rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh require.True(t, ok2) - require.Equal(t, neededColumns[0], rs2) + require.Equal(t, neededColumns[0], rs2.Item) stat = h.GetTableStats(tableInfo) hg = stat.Columns[tableInfo.Columns[2].ID].Histogram