*: move StmtCtx.ErrAutoincReadFailedAsWarning to errctx (#49992)

close pingcap/tidb#49991
This commit is contained in:
王超
2024-01-03 17:56:02 +08:00
committed by GitHub
parent 8645716b91
commit 383105433b
8 changed files with 159 additions and 55 deletions

View File

@ -33,12 +33,25 @@ const (
LevelIgnore
)
// LevelMap indicates the map from `ErrGroup` to `Level`
type LevelMap [errGroupCount]Level
// Context defines how to handle an error
type Context struct {
levelMap [errGroupCount]Level
levelMap LevelMap
warnHandler contextutil.WarnHandler
}
// LevelMap returns the `levelMap` of the context.
func (ctx *Context) LevelMap() LevelMap {
return ctx.levelMap
}
// LevelForGroup returns the level for a specified group.
func (ctx *Context) LevelForGroup(errGroup ErrGroup) Level {
return ctx.levelMap[errGroup]
}
// WithStrictErrGroupLevel makes the context to return the error directly for any kinds of errors.
func (ctx *Context) WithStrictErrGroupLevel() Context {
newCtx := Context{
@ -59,6 +72,14 @@ func (ctx *Context) WithErrGroupLevel(eg ErrGroup, l Level) Context {
return newCtx
}
// WithErrGroupLevels sets `levelMap` for an `ErrGroup`
func (ctx *Context) WithErrGroupLevels(levels LevelMap) Context {
return Context{
levelMap: levels,
warnHandler: ctx.warnHandler,
}
}
// appendWarning appends the error to warning. If the inner `warnHandler` is nil, do nothing.
func (ctx *Context) appendWarning(err error) {
intest.Assert(ctx.warnHandler != nil)
@ -136,9 +157,15 @@ func (ctx *Context) HandleErrorWithAlias(internalErr error, err error, warnErr e
// NewContext creates an error context to handle the errors and warnings
func NewContext(handler contextutil.WarnHandler) Context {
return NewContextWithLevels(LevelMap{}, handler)
}
// NewContextWithLevels creates an error context to handle the errors and warnings
func NewContextWithLevels(levels LevelMap, handler contextutil.WarnHandler) Context {
intest.Assert(handler != nil)
return Context{
warnHandler: handler,
levelMap: levels,
}
}
@ -153,9 +180,14 @@ type ErrGroup int
const (
// ErrGroupTruncate is the group of truncated errors
ErrGroupTruncate ErrGroup = iota
// ErrGroupOverflow is the group of overflow errors
ErrGroupOverflow
// ErrGroupDupKey is the group of duplicate key errors
ErrGroupDupKey
// ErrGroupBadNull is the group of bad null errors
ErrGroupBadNull
// ErrGroupDividedByZero is the group of divided by zero errors
ErrGroupDividedByZero
// ErrGroupAutoIncReadFailed is the group of auto increment read failed errors
ErrGroupAutoIncReadFailed
// errGroupCount is the count of all `ErrGroup`. Please leave it at the end of the list.
errGroupCount
)
@ -176,4 +208,6 @@ func init() {
for _, errCode := range truncateErrCodes {
errGroupMap[errCode] = ErrGroupTruncate
}
errGroupMap[errno.ErrAutoincReadFailed] = ErrGroupAutoIncReadFailed
}

View File

@ -44,6 +44,15 @@ func TestContext(t *testing.T) {
// newCtx will handle the error as a warn
require.NoError(t, newCtx.HandleErrorWithAlias(testInternalErr, testErr, testWarn))
require.Equal(t, warn, testWarn)
levels := newCtx.LevelMap()
for i := errctx.ErrGroup(0); i <= errctx.ErrGroup(len(levels)-1); i++ {
if i == errctx.ErrGroupTruncate {
require.Equal(t, errctx.LevelWarn, levels[i])
} else {
require.Equal(t, errctx.LevelError, levels[i])
require.Equal(t, levels[i], newCtx.LevelForGroup(i))
}
}
warn = nil
newCtx2 := newCtx.WithStrictErrGroupLevel()
@ -52,6 +61,7 @@ func TestContext(t *testing.T) {
require.Equal(t, warn, testWarn)
// newCtx2 will return all errors
require.Equal(t, newCtx2.HandleErrorWithAlias(testInternalErr, testErr, testWarn), testErr)
require.Equal(t, errctx.LevelMap{}, newCtx2.LevelMap())
// test `multierr`
testErrs := multierr.Append(testInternalErr, testErr)
@ -61,4 +71,22 @@ func TestContext(t *testing.T) {
// test nil
require.Nil(t, ctx.HandleError(nil))
// test with a level map
levels = errctx.LevelMap{}
levels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelWarn
ctx = errctx.NewContextWithLevels(levels, contextutil.NewFuncWarnHandlerForTest(func(err error) {
warn = err
}))
require.Equal(t, levels, ctx.LevelMap())
levels2 := errctx.LevelMap{}
levels2[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelIgnore
ctx = ctx.WithErrGroupLevels(levels2)
require.Equal(t, levels2, ctx.LevelMap())
// original levels should not change
ctx = ctx.WithErrGroupLevels(errctx.LevelMap{})
require.Equal(t, errctx.LevelMap{}, ctx.LevelMap())
require.Equal(t, errctx.LevelWarn, levels[errctx.ErrGroupAutoIncReadFailed])
require.Equal(t, errctx.LevelIgnore, levels2[errctx.ErrGroupAutoIncReadFailed])
}

View File

@ -112,6 +112,7 @@ go_library(
"//pkg/domain",
"//pkg/domain/infosync",
"//pkg/domain/resourcegroup",
"//pkg/errctx",
"//pkg/errno",
"//pkg/executor/aggfuncs",
"//pkg/executor/aggregate",

View File

@ -37,6 +37,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/schematracker"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/executor/aggregate"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/executor/internal/pdhelper"
@ -2112,15 +2113,19 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
ResetDeleteStmtCtx(sc, stmt, vars)
case *ast.InsertStmt:
sc.InInsertStmt = true
var errLevels errctx.LevelMap
// For insert statement (not for update statement), disabling the StrictSQLMode
// should make TruncateAsWarning and DividedByZeroAsWarning,
// but should not make DupKeyAsWarning.
sc.DupKeyAsWarning = stmt.IgnoreErr
sc.BadNullAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.IgnoreNoPartition = stmt.IgnoreErr
sc.ErrAutoincReadFailedAsWarning = stmt.IgnoreErr
if stmt.IgnoreErr {
errLevels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelWarn
}
sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr
sc.Priority = stmt.Priority
sc.SetErrLevels(errLevels)
sc.SetTypeFlags(sc.TypeFlags().
WithTruncateAsWarning(!vars.StrictSQLMode || stmt.IgnoreErr).
WithIgnoreInvalidDateErr(vars.SQLMode.HasAllowInvalidDatesMode()).

View File

@ -308,11 +308,9 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {
err := insertRows(ctx, e)
if err != nil {
terr, ok := errors.Cause(err).(*terror.Error)
if ok && len(e.OnDuplicate) == 0 &&
e.Ctx().GetSessionVars().StmtCtx.ErrAutoincReadFailedAsWarning &&
terr.Code() == errno.ErrAutoincReadFailed {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err)
return nil
if ok && len(e.OnDuplicate) == 0 && terr.Code() == errno.ErrAutoincReadFailed {
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx()
return ec.HandleError(err)
}
return err
}

View File

@ -14,6 +14,7 @@ go_library(
"//pkg/parser/mysql",
"//pkg/parser/terror",
"//pkg/types",
"//pkg/util/context",
"//pkg/util/disk",
"//pkg/util/execdetails",
"//pkg/util/intest",
@ -43,6 +44,7 @@ go_test(
flaky = True,
shard_count = 12,
deps = [
"//pkg/errctx",
"//pkg/kv",
"//pkg/sessionctx/variable",
"//pkg/testkit",

View File

@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/parser/terror"
"github.com/pingcap/tidb/pkg/types"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/disk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/intest"
@ -171,32 +172,31 @@ type StatementContext struct {
// IsDDLJobInQueue is used to mark whether the DDL job is put into the queue.
// If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker.
IsDDLJobInQueue bool
DDLJobID int64
InInsertStmt bool
InUpdateStmt bool
InDeleteStmt bool
InSelectStmt bool
InLoadDataStmt bool
InExplainStmt bool
InExplainAnalyzeStmt bool
ExplainFormat string
InCreateOrAlterStmt bool
InSetSessionStatesStmt bool
InPreparedPlanBuilding bool
DupKeyAsWarning bool
BadNullAsWarning bool
DividedByZeroAsWarning bool
ErrAutoincReadFailedAsWarning bool
InShowWarning bool
UseCache bool
ForcePlanCache bool // force the optimizer to use plan cache even if there is risky optimization, see #49736.
CacheType PlanCacheType
BatchCheck bool
InNullRejectCheck bool
IgnoreNoPartition bool
IgnoreExplainIDSuffix bool
MultiSchemaInfo *model.MultiSchemaInfo
IsDDLJobInQueue bool
DDLJobID int64
InInsertStmt bool
InUpdateStmt bool
InDeleteStmt bool
InSelectStmt bool
InLoadDataStmt bool
InExplainStmt bool
InExplainAnalyzeStmt bool
ExplainFormat string
InCreateOrAlterStmt bool
InSetSessionStatesStmt bool
InPreparedPlanBuilding bool
DupKeyAsWarning bool
BadNullAsWarning bool
DividedByZeroAsWarning bool
InShowWarning bool
UseCache bool
ForcePlanCache bool // force the optimizer to use plan cache even if there is risky optimization, see #49736.
CacheType PlanCacheType
BatchCheck bool
InNullRejectCheck bool
IgnoreNoPartition bool
IgnoreExplainIDSuffix bool
MultiSchemaInfo *model.MultiSchemaInfo
// If the select statement was like 'select * from t as of timestamp ...' or in a stale read transaction
// or is affected by the tidb_read_staleness session variable, then the statement will be makred as isStaleness
// in stmtCtx
@ -443,17 +443,17 @@ func NewStmtCtxWithTimeZone(tz *time.Location) *StatementContext {
ctxID: stmtCtxIDGenerator.Add(1),
}
sc.typeCtx = types.NewContext(types.DefaultStmtFlags, tz, sc)
sc.initErrCtx()
sc.errCtx = newErrCtx(sc.typeCtx, errctx.LevelMap{}, sc)
return sc
}
// Reset resets a statement context
func (sc *StatementContext) Reset() {
*sc = StatementContext{
ctxID: stmtCtxIDGenerator.Add(1),
typeCtx: types.NewContext(types.DefaultStmtFlags, time.UTC, sc),
ctxID: stmtCtxIDGenerator.Add(1),
}
sc.initErrCtx()
sc.typeCtx = types.NewContext(types.DefaultStmtFlags, time.UTC, sc)
sc.errCtx = newErrCtx(sc.typeCtx, errctx.LevelMap{}, sc)
}
// CtxID returns the context id of the statement
@ -482,22 +482,17 @@ func (sc *StatementContext) TypeCtx() types.Context {
return sc.typeCtx
}
func (sc *StatementContext) initErrCtx() {
ctx := errctx.NewContext(sc)
if sc.TypeFlags().IgnoreTruncateErr() {
ctx = ctx.WithErrGroupLevel(errctx.ErrGroupTruncate, errctx.LevelIgnore)
} else if sc.TypeFlags().TruncateAsWarning() {
ctx = ctx.WithErrGroupLevel(errctx.ErrGroupTruncate, errctx.LevelWarn)
}
sc.errCtx = ctx
}
// ErrCtx returns the error context
func (sc *StatementContext) ErrCtx() errctx.Context {
return sc.errCtx
}
// SetErrLevels sets the error levels for statement
// The argument otherLevels is used to set the error levels except truncate
func (sc *StatementContext) SetErrLevels(otherLevels errctx.LevelMap) {
sc.errCtx = newErrCtx(sc.typeCtx, otherLevels, sc)
}
// TypeFlags returns the type flags
func (sc *StatementContext) TypeFlags() types.Flags {
return sc.typeCtx.Flags()
@ -506,7 +501,7 @@ func (sc *StatementContext) TypeFlags() types.Flags {
// SetTypeFlags sets the type flags
func (sc *StatementContext) SetTypeFlags(flags types.Flags) {
sc.typeCtx = sc.typeCtx.WithFlags(flags)
sc.initErrCtx()
sc.errCtx = newErrCtx(sc.typeCtx, sc.errCtx.LevelMap(), sc)
}
// HandleTruncate ignores or returns the error based on the TypeContext inside.
@ -1415,6 +1410,18 @@ func (sc *StatementContext) TypeCtxOrDefault() types.Context {
return types.DefaultStmtNoWarningContext
}
func newErrCtx(tc types.Context, otherLevels errctx.LevelMap, handler contextutil.WarnHandler) errctx.Context {
l := errctx.LevelError
if flags := tc.Flags(); flags.IgnoreTruncateErr() {
l = errctx.LevelIgnore
} else if flags.TruncateAsWarning() {
l = errctx.LevelWarn
}
otherLevels[errctx.ErrGroupTruncate] = l
return errctx.NewContextWithLevels(otherLevels, handler)
}
// UsedStatsInfoForTable records stats that are used during query and their information.
type UsedStatsInfoForTable struct {
Name string

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
@ -317,6 +318,7 @@ func TestNewStmtCtx(t *testing.T) {
require.Equal(t, types.DefaultStmtFlags, sc.TypeFlags())
require.Same(t, time.UTC, sc.TimeZone())
require.Same(t, time.UTC, sc.TimeZone())
require.Equal(t, errctx.NewContextWithLevels(errctx.LevelMap{}, sc), sc.ErrCtx())
sc.AppendWarning(errors.NewNoStackError("err1"))
warnings := sc.GetWarnings()
require.Equal(t, 1, len(warnings))
@ -328,6 +330,7 @@ func TestNewStmtCtx(t *testing.T) {
require.Equal(t, types.DefaultStmtFlags, sc.TypeFlags())
require.Same(t, tz, sc.TimeZone())
require.Same(t, tz, sc.TimeZone())
require.Equal(t, errctx.NewContextWithLevels(errctx.LevelMap{}, sc), sc.ErrCtx())
sc.AppendWarning(errors.NewNoStackError("err2"))
warnings = sc.GetWarnings()
require.Equal(t, 1, len(warnings))
@ -347,13 +350,17 @@ func TestSetStmtCtxTypeFlags(t *testing.T) {
sc := stmtctx.NewStmtCtx()
require.Equal(t, types.DefaultStmtFlags, sc.TypeFlags())
levels := errctx.LevelMap{}
sc.SetTypeFlags(types.FlagAllowNegativeToUnsigned | types.FlagSkipASCIICheck)
require.Equal(t, types.FlagAllowNegativeToUnsigned|types.FlagSkipASCIICheck, sc.TypeFlags())
require.Equal(t, sc.TypeFlags(), sc.TypeFlags())
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
sc.SetTypeFlags(types.FlagSkipASCIICheck | types.FlagSkipUTF8Check | types.FlagTruncateAsWarning)
require.Equal(t, types.FlagSkipASCIICheck|types.FlagSkipUTF8Check|types.FlagTruncateAsWarning, sc.TypeFlags())
require.Equal(t, sc.TypeFlags(), sc.TypeFlags())
levels[errctx.ErrGroupTruncate] = errctx.LevelWarn
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
}
func TestResetStmtCtx(t *testing.T) {
@ -362,14 +369,17 @@ func TestResetStmtCtx(t *testing.T) {
tz := time.FixedZone("UTC+1", 2*60*60)
sc.SetTimeZone(tz)
sc.SetTypeFlags(types.FlagAllowNegativeToUnsigned | types.FlagSkipASCIICheck)
sc.SetTypeFlags(types.FlagIgnoreTruncateErr | types.FlagAllowNegativeToUnsigned | types.FlagSkipASCIICheck)
sc.AppendWarning(errors.NewNoStackError("err1"))
sc.InRestrictedSQL = true
sc.StmtType = "Insert"
require.Same(t, tz, sc.TimeZone())
require.Equal(t, types.FlagAllowNegativeToUnsigned|types.FlagSkipASCIICheck, sc.TypeFlags())
require.Equal(t, types.FlagIgnoreTruncateErr|types.FlagAllowNegativeToUnsigned|types.FlagSkipASCIICheck, sc.TypeFlags())
require.Equal(t, 1, len(sc.GetWarnings()))
levels := errctx.LevelMap{}
levels[errctx.ErrGroupTruncate] = errctx.LevelIgnore
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
sc.Reset()
require.Same(t, time.UTC, sc.TimeZone())
@ -384,6 +394,8 @@ func TestResetStmtCtx(t *testing.T) {
require.Equal(t, 1, len(warnings))
require.Equal(t, stmtctx.WarnLevelWarning, warnings[0].Level)
require.Equal(t, "err2", warnings[0].Err.Error())
levels = errctx.LevelMap{}
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
}
func TestStmtCtxID(t *testing.T) {
@ -413,10 +425,27 @@ func TestErrCtx(t *testing.T) {
// the default errCtx
err := types.ErrTruncated
require.Error(t, sc.HandleError(err))
levels := errctx.LevelMap{}
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
// reset the types flags will re-initialize the error flag
// set error levels
levels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelIgnore
sc.SetErrLevels(levels)
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
// reset the types flags will re-initialize the error flag, but keeps the error levels unchanged except for ErrGroupTruncate
sc.SetTypeFlags(types.DefaultStmtFlags | types.FlagTruncateAsWarning)
require.NoError(t, sc.HandleError(err))
levels = errctx.LevelMap{}
levels[errctx.ErrGroupTruncate] = errctx.LevelWarn
levels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelIgnore
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
// SetErrLevels will not affect ErrGroupTruncate
sc.SetErrLevels(errctx.LevelMap{})
levels = errctx.LevelMap{}
levels[errctx.ErrGroupTruncate] = errctx.LevelWarn
require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx())
}
func BenchmarkErrCtx(b *testing.B) {