planner: add warn log for sync stats (#36956)
This commit is contained in:
@ -1296,7 +1296,15 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
|
||||
ExecRetryCount: a.retryCount,
|
||||
IsExplicitTxn: sessVars.TxnCtx.IsExplicit,
|
||||
IsWriteCacheTable: stmtCtx.WaitLockLeaseTime > 0,
|
||||
IsSyncStatsFailed: stmtCtx.IsSyncStatsFailed,
|
||||
}
|
||||
failpoint.Inject("assertSyncStatsFailed", func(val failpoint.Value) {
|
||||
if val.(bool) {
|
||||
if !slowItems.IsSyncStatsFailed {
|
||||
panic("isSyncStatsFailed should be true")
|
||||
}
|
||||
}
|
||||
})
|
||||
if a.retryCount > 0 {
|
||||
slowItems.ExecRetryTime = costTime - sessVars.DurationParse - sessVars.DurationCompile - time.Since(a.retryStartTime)
|
||||
}
|
||||
|
||||
@ -1925,6 +1925,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
|
||||
sc.EnableOptimizeTrace = false
|
||||
sc.OptimizeTracer = nil
|
||||
sc.OptimizerCETrace = nil
|
||||
sc.IsSyncStatsFailed = false
|
||||
|
||||
sc.SysdateIsNow = ctx.GetSessionVars().SysdateIsNow
|
||||
|
||||
|
||||
@ -25,7 +25,9 @@ import (
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/pingcap/tidb/util/mathutil"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type collectPredicateColumnsPoint struct{}
|
||||
@ -41,6 +43,9 @@ func (c collectPredicateColumnsPoint) optimize(_ context.Context, plan LogicalPl
|
||||
if len(predicateColumns) > 0 {
|
||||
plan.SCtx().UpdateColStatsUsage(predicateColumns)
|
||||
}
|
||||
if !histNeeded {
|
||||
return plan, nil
|
||||
}
|
||||
histNeededIndices := collectSyncIndices(plan.SCtx(), histNeededColumns)
|
||||
histNeededItems := collectHistNeededItems(histNeededColumns, histNeededIndices)
|
||||
if histNeeded && len(histNeededItems) > 0 {
|
||||
@ -85,6 +90,8 @@ func RequestLoadStats(ctx sessionctx.Context, neededHistItems []model.TableItemI
|
||||
var timeout = time.Duration(waitTime)
|
||||
err := domain.GetDomain(ctx).StatsHandle().SendLoadRequests(stmtCtx, neededHistItems, timeout)
|
||||
if err != nil {
|
||||
logutil.BgLogger().Warn("SendLoadRequests failed", zap.Error(err))
|
||||
stmtCtx.IsSyncStatsFailed = true
|
||||
return handleTimeout(stmtCtx)
|
||||
}
|
||||
return nil
|
||||
@ -100,6 +107,8 @@ func SyncWaitStatsLoad(plan LogicalPlan) (bool, error) {
|
||||
if success {
|
||||
return true, nil
|
||||
}
|
||||
logutil.BgLogger().Warn("SyncWaitStatsLoad failed")
|
||||
stmtCtx.IsSyncStatsFailed = true
|
||||
err := handleTimeout(stmtCtx)
|
||||
return false, err
|
||||
}
|
||||
|
||||
@ -20,6 +20,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/failpoint"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/executor"
|
||||
"github.com/pingcap/tidb/parser"
|
||||
@ -27,7 +28,9 @@ import (
|
||||
"github.com/pingcap/tidb/planner"
|
||||
plannercore "github.com/pingcap/tidb/planner/core"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/stmtctx"
|
||||
"github.com/pingcap/tidb/statistics"
|
||||
"github.com/pingcap/tidb/statistics/handle"
|
||||
"github.com/pingcap/tidb/testkit"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -259,15 +262,26 @@ func TestPlanStatsLoadTimeout(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
tableInfo := tbl.Meta()
|
||||
neededColumn := model.TableItemID{TableID: tableInfo.ID, ID: tableInfo.Columns[0].ID, IsIndex: false}
|
||||
resultCh := make(chan model.TableItemID, 1)
|
||||
resultCh := make(chan stmtctx.StatsLoadResult, 1)
|
||||
timeout := time.Duration(1<<63 - 1)
|
||||
dom.StatsHandle().AppendNeededItem(neededColumn, resultCh, timeout) // make channel queue full
|
||||
stmt, err := p.ParseOneStmt("select * from t where c>1", "", "")
|
||||
task := &handle.NeededItemTask{
|
||||
TableItemID: neededColumn,
|
||||
ResultCh: resultCh,
|
||||
ToTimeout: time.Now().Local().Add(timeout),
|
||||
}
|
||||
dom.StatsHandle().AppendNeededItem(task, timeout) // make channel queue full
|
||||
sql := "select * from t where c>1"
|
||||
stmt, err := p.ParseOneStmt(sql, "", "")
|
||||
require.NoError(t, err)
|
||||
tk.MustExec("set global tidb_stats_load_pseudo_timeout=false")
|
||||
_, _, err = planner.Optimize(context.TODO(), ctx, stmt, is)
|
||||
require.Error(t, err) // fail sql for timeout when pseudo=false
|
||||
|
||||
tk.MustExec("set global tidb_stats_load_pseudo_timeout=true")
|
||||
require.NoError(t, failpoint.Enable("github.com/pingcap/executor/assertSyncStatsFailed", `return(true)`))
|
||||
tk.MustExec(sql) // not fail sql for timeout when pseudo=true
|
||||
failpoint.Disable("github.com/pingcap/executor/assertSyncStatsFailed")
|
||||
|
||||
plan, _, err := planner.Optimize(context.TODO(), ctx, stmt, is)
|
||||
require.NoError(t, err) // not fail sql for timeout when pseudo=true
|
||||
switch pp := plan.(type) {
|
||||
|
||||
@ -15,6 +15,7 @@
|
||||
package stmtctx
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"math"
|
||||
"strconv"
|
||||
@ -293,7 +294,7 @@ type StatementContext struct {
|
||||
// NeededItems stores the columns/indices whose stats are needed for planner.
|
||||
NeededItems []model.TableItemID
|
||||
// ResultCh to receive stats loading results
|
||||
ResultCh chan model.TableItemID
|
||||
ResultCh chan StatsLoadResult
|
||||
// Fallback indicates if the planner uses full-loaded stats or fallback all to pseudo/simple.
|
||||
Fallback bool
|
||||
// LoadStartTime is to record the load start time to calculate latency
|
||||
@ -310,6 +311,9 @@ type StatementContext struct {
|
||||
IsSQLRegistered atomic2.Bool
|
||||
// IsSQLAndPlanRegistered uses to indicate whether the SQL and plan has been registered for TopSQL.
|
||||
IsSQLAndPlanRegistered atomic2.Bool
|
||||
|
||||
// IsSyncStatsFailed indicates whether any failure happened during sync stats
|
||||
IsSyncStatsFailed bool
|
||||
}
|
||||
|
||||
// StmtHints are SessionVars related sql hints.
|
||||
@ -1014,3 +1018,30 @@ func (d *CopTasksDetails) ToZapFields() (fields []zap.Field) {
|
||||
fields = append(fields, zap.String("wait_max_addr", d.MaxWaitAddress))
|
||||
return fields
|
||||
}
|
||||
|
||||
// StatsLoadResult indicates result for StatsLoad
|
||||
type StatsLoadResult struct {
|
||||
Item model.TableItemID
|
||||
Error error
|
||||
}
|
||||
|
||||
// HasError returns whether result has error
|
||||
func (r StatsLoadResult) HasError() bool {
|
||||
return r.Error != nil
|
||||
}
|
||||
|
||||
// ErrorMsg returns StatsLoadResult err msg
|
||||
func (r StatsLoadResult) ErrorMsg() string {
|
||||
if r.Error == nil {
|
||||
return ""
|
||||
}
|
||||
b := bytes.NewBufferString("tableID:")
|
||||
b.WriteString(strconv.FormatInt(r.Item.TableID, 10))
|
||||
b.WriteString(", id:")
|
||||
b.WriteString(strconv.FormatInt(r.Item.ID, 10))
|
||||
b.WriteString(", isIndex:")
|
||||
b.WriteString(strconv.FormatBool(r.Item.IsIndex))
|
||||
b.WriteString(", err:")
|
||||
b.WriteString(r.Error.Error())
|
||||
return b.String()
|
||||
}
|
||||
|
||||
@ -2526,6 +2526,8 @@ const (
|
||||
SlowLogIsExplicitTxn = "IsExplicitTxn"
|
||||
// SlowLogIsWriteCacheTable is used to indicate whether writing to the cache table need to wait for the read lock to expire.
|
||||
SlowLogIsWriteCacheTable = "IsWriteCacheTable"
|
||||
// SlowLogIsSyncStatsFailed is used to indicate whether any failure happen during sync stats
|
||||
SlowLogIsSyncStatsFailed = "IsSyncStatsFailed"
|
||||
)
|
||||
|
||||
// GenerateBinaryPlan decides whether we should record binary plan in slow log and stmt summary.
|
||||
@ -2568,6 +2570,7 @@ type SlowQueryLogItems struct {
|
||||
ResultRows int64
|
||||
IsExplicitTxn bool
|
||||
IsWriteCacheTable bool
|
||||
IsSyncStatsFailed bool
|
||||
}
|
||||
|
||||
// SlowLogFormat uses for formatting slow log.
|
||||
@ -2732,6 +2735,7 @@ func (s *SessionVars) SlowLogFormat(logItems *SlowQueryLogItems) string {
|
||||
writeSlowLogItem(&buf, SlowLogResultRows, strconv.FormatInt(logItems.ResultRows, 10))
|
||||
writeSlowLogItem(&buf, SlowLogSucc, strconv.FormatBool(logItems.Succ))
|
||||
writeSlowLogItem(&buf, SlowLogIsExplicitTxn, strconv.FormatBool(logItems.IsExplicitTxn))
|
||||
writeSlowLogItem(&buf, SlowLogIsSyncStatsFailed, strconv.FormatBool(logItems.IsSyncStatsFailed))
|
||||
if s.StmtCtx.WaitLockLeaseTime > 0 {
|
||||
writeSlowLogItem(&buf, SlowLogIsWriteCacheTable, strconv.FormatBool(logItems.IsWriteCacheTable))
|
||||
}
|
||||
|
||||
@ -232,6 +232,7 @@ func TestSlowLogFormat(t *testing.T) {
|
||||
# Result_rows: 12345
|
||||
# Succ: true
|
||||
# IsExplicitTxn: true
|
||||
# IsSyncStatsFailed: false
|
||||
# IsWriteCacheTable: true`
|
||||
sql := "select * from t;"
|
||||
_, digest := parser.NormalizeDigest(sql)
|
||||
|
||||
@ -971,7 +971,7 @@ const (
|
||||
DefTiDBPersistAnalyzeOptions = true
|
||||
DefTiDBEnableColumnTracking = false
|
||||
DefTiDBStatsLoadSyncWait = 0
|
||||
DefTiDBStatsLoadPseudoTimeout = false
|
||||
DefTiDBStatsLoadPseudoTimeout = true
|
||||
DefSysdateIsNow = false
|
||||
DefTiDBEnableMutationChecker = false
|
||||
DefTiDBTxnAssertionLevel = AssertionOffStr
|
||||
|
||||
@ -215,7 +215,7 @@ func NewHandle(ctx sessionctx.Context, lease time.Duration, pool sessionPool, tr
|
||||
handle.StatsLoad.SubCtxs = make([]sessionctx.Context, cfg.Performance.StatsLoadConcurrency)
|
||||
handle.StatsLoad.NeededItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
||||
handle.StatsLoad.TimeoutItemsCh = make(chan *NeededItemTask, cfg.Performance.StatsLoadQueueSize)
|
||||
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan model.TableItemID{}
|
||||
handle.StatsLoad.WorkingColMap = map[model.TableItemID][]chan stmtctx.StatsLoadResult{}
|
||||
err := handle.RefreshVars()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -46,14 +46,14 @@ type StatsLoad struct {
|
||||
SubCtxs []sessionctx.Context
|
||||
NeededItemsCh chan *NeededItemTask
|
||||
TimeoutItemsCh chan *NeededItemTask
|
||||
WorkingColMap map[model.TableItemID][]chan model.TableItemID
|
||||
WorkingColMap map[model.TableItemID][]chan stmtctx.StatsLoadResult
|
||||
}
|
||||
|
||||
// NeededItemTask represents one needed column/indices with expire time.
|
||||
type NeededItemTask struct {
|
||||
TableItemID model.TableItemID
|
||||
ToTimeout time.Time
|
||||
ResultCh chan model.TableItemID
|
||||
ResultCh chan stmtctx.StatsLoadResult
|
||||
}
|
||||
|
||||
// SendLoadRequests send neededColumns requests
|
||||
@ -64,9 +64,14 @@ func (h *Handle) SendLoadRequests(sc *stmtctx.StatementContext, neededHistItems
|
||||
}
|
||||
sc.StatsLoad.Timeout = timeout
|
||||
sc.StatsLoad.NeededItems = remainedItems
|
||||
sc.StatsLoad.ResultCh = make(chan model.TableItemID, len(remainedItems))
|
||||
for _, col := range remainedItems {
|
||||
err := h.AppendNeededItem(col, sc.StatsLoad.ResultCh, timeout)
|
||||
sc.StatsLoad.ResultCh = make(chan stmtctx.StatsLoadResult, len(remainedItems))
|
||||
for _, item := range remainedItems {
|
||||
task := &NeededItemTask{
|
||||
TableItemID: item,
|
||||
ToTimeout: time.Now().Local().Add(timeout),
|
||||
ResultCh: sc.StatsLoad.ResultCh,
|
||||
}
|
||||
err := h.AppendNeededItem(task, timeout)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -80,7 +85,12 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
|
||||
if len(sc.StatsLoad.NeededItems) <= 0 {
|
||||
return true
|
||||
}
|
||||
var errorMsgs []string
|
||||
defer func() {
|
||||
if len(errorMsgs) > 0 {
|
||||
logutil.BgLogger().Warn("SyncWaitStatsLoad meets error",
|
||||
zap.Strings("errors", errorMsgs))
|
||||
}
|
||||
sc.StatsLoad.NeededItems = nil
|
||||
}()
|
||||
resultCheckMap := map[model.TableItemID]struct{}{}
|
||||
@ -94,7 +104,10 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
|
||||
select {
|
||||
case result, ok := <-sc.StatsLoad.ResultCh:
|
||||
if ok {
|
||||
delete(resultCheckMap, result)
|
||||
if result.HasError() {
|
||||
errorMsgs = append(errorMsgs, result.ErrorMsg())
|
||||
}
|
||||
delete(resultCheckMap, result.Item)
|
||||
if len(resultCheckMap) == 0 {
|
||||
metrics.SyncLoadHistogram.Observe(float64(time.Since(sc.StatsLoad.LoadStartTime).Milliseconds()))
|
||||
return true
|
||||
@ -104,6 +117,7 @@ func (h *Handle) SyncWaitStatsLoad(sc *stmtctx.StatementContext) bool {
|
||||
}
|
||||
case <-timer.C:
|
||||
metrics.SyncLoadTimeoutCounter.Inc()
|
||||
logutil.BgLogger().Warn("SyncWaitStatsLoad timeout")
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -134,9 +148,7 @@ func (h *Handle) removeHistLoadedColumns(neededItems []model.TableItemID) []mode
|
||||
}
|
||||
|
||||
// AppendNeededItem appends needed columns/indices to ch, if exists, do not append the duplicated one.
|
||||
func (h *Handle) AppendNeededItem(item model.TableItemID, resultCh chan model.TableItemID, timeout time.Duration) error {
|
||||
toTimout := time.Now().Local().Add(timeout)
|
||||
task := &NeededItemTask{TableItemID: item, ToTimeout: toTimout, ResultCh: resultCh}
|
||||
func (h *Handle) AppendNeededItem(task *NeededItemTask, timeout time.Duration) error {
|
||||
return h.writeToChanWithTimeout(h.StatsLoad.NeededItemsCh, task, timeout)
|
||||
}
|
||||
|
||||
@ -202,11 +214,12 @@ func (h *Handle) HandleOneTask(lastTask *NeededItemTask, readerCtx *StatsReaderC
|
||||
}
|
||||
|
||||
func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderContext, ctx sqlexec.RestrictedSQLExecutor) (*NeededItemTask, error) {
|
||||
item := task.TableItemID
|
||||
result := stmtctx.StatsLoadResult{Item: task.TableItemID}
|
||||
item := result.Item
|
||||
oldCache := h.statsCache.Load().(statsCache)
|
||||
tbl, ok := oldCache.Get(item.TableID)
|
||||
if !ok {
|
||||
h.writeToResultChan(task.ResultCh, item)
|
||||
h.writeToResultChan(task.ResultCh, result)
|
||||
return nil, nil
|
||||
}
|
||||
var err error
|
||||
@ -214,22 +227,22 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
|
||||
if item.IsIndex {
|
||||
index, ok := tbl.Indices[item.ID]
|
||||
if !ok || index.IsFullLoad() {
|
||||
h.writeToResultChan(task.ResultCh, item)
|
||||
h.writeToResultChan(task.ResultCh, result)
|
||||
return nil, nil
|
||||
}
|
||||
wrapper.idx = index
|
||||
} else {
|
||||
col, ok := tbl.Columns[item.ID]
|
||||
if !ok || col.IsFullLoad() {
|
||||
h.writeToResultChan(task.ResultCh, item)
|
||||
h.writeToResultChan(task.ResultCh, result)
|
||||
return nil, nil
|
||||
}
|
||||
wrapper.col = col
|
||||
}
|
||||
// to avoid duplicated handling in concurrent scenario
|
||||
working := h.setWorking(task.TableItemID, task.ResultCh)
|
||||
working := h.setWorking(result.Item, task.ResultCh)
|
||||
if !working {
|
||||
h.writeToResultChan(task.ResultCh, item)
|
||||
h.writeToResultChan(task.ResultCh, result)
|
||||
return nil, nil
|
||||
}
|
||||
// refresh statsReader to get latest stats
|
||||
@ -238,6 +251,7 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
|
||||
needUpdate := false
|
||||
wrapper, err = h.readStatsForOneItem(item, wrapper, readerCtx.reader)
|
||||
if err != nil {
|
||||
result.Error = err
|
||||
return task, err
|
||||
}
|
||||
if item.IsIndex {
|
||||
@ -251,9 +265,9 @@ func (h *Handle) handleOneItemTask(task *NeededItemTask, readerCtx *StatsReaderC
|
||||
}
|
||||
metrics.ReadStatsHistogram.Observe(float64(time.Since(t).Milliseconds()))
|
||||
if needUpdate && h.updateCachedItem(item, wrapper.col, wrapper.idx) {
|
||||
h.writeToResultChan(task.ResultCh, item)
|
||||
h.writeToResultChan(task.ResultCh, result)
|
||||
}
|
||||
h.finishWorking(item)
|
||||
h.finishWorking(result)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
@ -425,7 +439,7 @@ func (h *Handle) writeToChanWithTimeout(taskCh chan *NeededItemTask, task *Neede
|
||||
}
|
||||
|
||||
// writeToResultChan safe-writes with panic-recover so one write-fail will not have big impact.
|
||||
func (h *Handle) writeToResultChan(resultCh chan model.TableItemID, rs model.TableItemID) {
|
||||
func (h *Handle) writeToResultChan(resultCh chan stmtctx.StatsLoadResult, rs stmtctx.StatsLoadResult) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
logutil.BgLogger().Error("writeToResultChan panicked", zap.Any("error", r), zap.Stack("stack"))
|
||||
@ -466,7 +480,7 @@ func (h *Handle) updateCachedItem(item model.TableItemID, colHist *statistics.Co
|
||||
return h.updateStatsCache(oldCache.update([]*statistics.Table{tbl}, nil, oldCache.version, WithTableStatsByQuery()))
|
||||
}
|
||||
|
||||
func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableItemID) bool {
|
||||
func (h *Handle) setWorking(item model.TableItemID, resultCh chan stmtctx.StatsLoadResult) bool {
|
||||
h.StatsLoad.Lock()
|
||||
defer h.StatsLoad.Unlock()
|
||||
chList, ok := h.StatsLoad.WorkingColMap[item]
|
||||
@ -477,20 +491,20 @@ func (h *Handle) setWorking(item model.TableItemID, resultCh chan model.TableIte
|
||||
h.StatsLoad.WorkingColMap[item] = append(chList, resultCh)
|
||||
return false
|
||||
}
|
||||
chList = []chan model.TableItemID{}
|
||||
chList = []chan stmtctx.StatsLoadResult{}
|
||||
chList = append(chList, resultCh)
|
||||
h.StatsLoad.WorkingColMap[item] = chList
|
||||
return true
|
||||
}
|
||||
|
||||
func (h *Handle) finishWorking(item model.TableItemID) {
|
||||
func (h *Handle) finishWorking(result stmtctx.StatsLoadResult) {
|
||||
h.StatsLoad.Lock()
|
||||
defer h.StatsLoad.Unlock()
|
||||
if chList, ok := h.StatsLoad.WorkingColMap[item]; ok {
|
||||
if chList, ok := h.StatsLoad.WorkingColMap[result.Item]; ok {
|
||||
list := chList[1:]
|
||||
for _, ch := range list {
|
||||
h.writeToResultChan(ch, item)
|
||||
h.writeToResultChan(ch, result)
|
||||
}
|
||||
}
|
||||
delete(h.StatsLoad.WorkingColMap, item)
|
||||
delete(h.StatsLoad.WorkingColMap, result.Item)
|
||||
}
|
||||
|
||||
@ -221,10 +221,10 @@ func TestConcurrentLoadHistWithPanicAndFail(t *testing.T) {
|
||||
|
||||
rs1, ok1 := <-stmtCtx1.StatsLoad.ResultCh
|
||||
require.True(t, ok1)
|
||||
require.Equal(t, neededColumns[0], rs1)
|
||||
require.Equal(t, neededColumns[0], rs1.Item)
|
||||
rs2, ok2 := <-stmtCtx2.StatsLoad.ResultCh
|
||||
require.True(t, ok2)
|
||||
require.Equal(t, neededColumns[0], rs2)
|
||||
require.Equal(t, neededColumns[0], rs2.Item)
|
||||
|
||||
stat = h.GetTableStats(tableInfo)
|
||||
hg = stat.Columns[tableInfo.Columns[2].ID].Histogram
|
||||
|
||||
Reference in New Issue
Block a user