From 383105433b3cd52bb3ad544c1915f63fe9f07fe4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 3 Jan 2024 17:56:02 +0800 Subject: [PATCH] *: move `StmtCtx.ErrAutoincReadFailedAsWarning` to errctx (#49992) close pingcap/tidb#49991 --- pkg/errctx/context.go | 42 ++++++++++-- pkg/errctx/context_test.go | 28 ++++++++ pkg/executor/BUILD.bazel | 1 + pkg/executor/executor.go | 7 +- pkg/executor/insert.go | 8 +-- pkg/sessionctx/stmtctx/BUILD.bazel | 2 + pkg/sessionctx/stmtctx/stmtctx.go | 91 ++++++++++++++------------ pkg/sessionctx/stmtctx/stmtctx_test.go | 35 +++++++++- 8 files changed, 159 insertions(+), 55 deletions(-) diff --git a/pkg/errctx/context.go b/pkg/errctx/context.go index 1bb873ef0b..3f127510f5 100644 --- a/pkg/errctx/context.go +++ b/pkg/errctx/context.go @@ -33,12 +33,25 @@ const ( LevelIgnore ) +// LevelMap indicates the map from `ErrGroup` to `Level` +type LevelMap [errGroupCount]Level + // Context defines how to handle an error type Context struct { - levelMap [errGroupCount]Level + levelMap LevelMap warnHandler contextutil.WarnHandler } +// LevelMap returns the `levelMap` of the context. +func (ctx *Context) LevelMap() LevelMap { + return ctx.levelMap +} + +// LevelForGroup returns the level for a specified group. +func (ctx *Context) LevelForGroup(errGroup ErrGroup) Level { + return ctx.levelMap[errGroup] +} + // WithStrictErrGroupLevel makes the context to return the error directly for any kinds of errors. func (ctx *Context) WithStrictErrGroupLevel() Context { newCtx := Context{ @@ -59,6 +72,14 @@ func (ctx *Context) WithErrGroupLevel(eg ErrGroup, l Level) Context { return newCtx } +// WithErrGroupLevels sets `levelMap` for an `ErrGroup` +func (ctx *Context) WithErrGroupLevels(levels LevelMap) Context { + return Context{ + levelMap: levels, + warnHandler: ctx.warnHandler, + } +} + // appendWarning appends the error to warning. If the inner `warnHandler` is nil, do nothing. func (ctx *Context) appendWarning(err error) { intest.Assert(ctx.warnHandler != nil) @@ -136,9 +157,15 @@ func (ctx *Context) HandleErrorWithAlias(internalErr error, err error, warnErr e // NewContext creates an error context to handle the errors and warnings func NewContext(handler contextutil.WarnHandler) Context { + return NewContextWithLevels(LevelMap{}, handler) +} + +// NewContextWithLevels creates an error context to handle the errors and warnings +func NewContextWithLevels(levels LevelMap, handler contextutil.WarnHandler) Context { intest.Assert(handler != nil) return Context{ warnHandler: handler, + levelMap: levels, } } @@ -153,9 +180,14 @@ type ErrGroup int const ( // ErrGroupTruncate is the group of truncated errors ErrGroupTruncate ErrGroup = iota - // ErrGroupOverflow is the group of overflow errors - ErrGroupOverflow - + // ErrGroupDupKey is the group of duplicate key errors + ErrGroupDupKey + // ErrGroupBadNull is the group of bad null errors + ErrGroupBadNull + // ErrGroupDividedByZero is the group of divided by zero errors + ErrGroupDividedByZero + // ErrGroupAutoIncReadFailed is the group of auto increment read failed errors + ErrGroupAutoIncReadFailed // errGroupCount is the count of all `ErrGroup`. Please leave it at the end of the list. errGroupCount ) @@ -176,4 +208,6 @@ func init() { for _, errCode := range truncateErrCodes { errGroupMap[errCode] = ErrGroupTruncate } + + errGroupMap[errno.ErrAutoincReadFailed] = ErrGroupAutoIncReadFailed } diff --git a/pkg/errctx/context_test.go b/pkg/errctx/context_test.go index 7b6b7c0c9d..1627853662 100644 --- a/pkg/errctx/context_test.go +++ b/pkg/errctx/context_test.go @@ -44,6 +44,15 @@ func TestContext(t *testing.T) { // newCtx will handle the error as a warn require.NoError(t, newCtx.HandleErrorWithAlias(testInternalErr, testErr, testWarn)) require.Equal(t, warn, testWarn) + levels := newCtx.LevelMap() + for i := errctx.ErrGroup(0); i <= errctx.ErrGroup(len(levels)-1); i++ { + if i == errctx.ErrGroupTruncate { + require.Equal(t, errctx.LevelWarn, levels[i]) + } else { + require.Equal(t, errctx.LevelError, levels[i]) + require.Equal(t, levels[i], newCtx.LevelForGroup(i)) + } + } warn = nil newCtx2 := newCtx.WithStrictErrGroupLevel() @@ -52,6 +61,7 @@ func TestContext(t *testing.T) { require.Equal(t, warn, testWarn) // newCtx2 will return all errors require.Equal(t, newCtx2.HandleErrorWithAlias(testInternalErr, testErr, testWarn), testErr) + require.Equal(t, errctx.LevelMap{}, newCtx2.LevelMap()) // test `multierr` testErrs := multierr.Append(testInternalErr, testErr) @@ -61,4 +71,22 @@ func TestContext(t *testing.T) { // test nil require.Nil(t, ctx.HandleError(nil)) + + // test with a level map + levels = errctx.LevelMap{} + levels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelWarn + ctx = errctx.NewContextWithLevels(levels, contextutil.NewFuncWarnHandlerForTest(func(err error) { + warn = err + })) + require.Equal(t, levels, ctx.LevelMap()) + levels2 := errctx.LevelMap{} + levels2[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelIgnore + ctx = ctx.WithErrGroupLevels(levels2) + require.Equal(t, levels2, ctx.LevelMap()) + + // original levels should not change + ctx = ctx.WithErrGroupLevels(errctx.LevelMap{}) + require.Equal(t, errctx.LevelMap{}, ctx.LevelMap()) + require.Equal(t, errctx.LevelWarn, levels[errctx.ErrGroupAutoIncReadFailed]) + require.Equal(t, errctx.LevelIgnore, levels2[errctx.ErrGroupAutoIncReadFailed]) } diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index b0d47dc039..7bb68be626 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -112,6 +112,7 @@ go_library( "//pkg/domain", "//pkg/domain/infosync", "//pkg/domain/resourcegroup", + "//pkg/errctx", "//pkg/errno", "//pkg/executor/aggfuncs", "//pkg/executor/aggregate", diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 55261aaf31..397fc2be76 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -37,6 +37,7 @@ import ( "github.com/pingcap/tidb/pkg/ddl/schematracker" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" + "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/executor/aggregate" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/executor/internal/pdhelper" @@ -2112,15 +2113,19 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { ResetDeleteStmtCtx(sc, stmt, vars) case *ast.InsertStmt: sc.InInsertStmt = true + var errLevels errctx.LevelMap // For insert statement (not for update statement), disabling the StrictSQLMode // should make TruncateAsWarning and DividedByZeroAsWarning, // but should not make DupKeyAsWarning. sc.DupKeyAsWarning = stmt.IgnoreErr sc.BadNullAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr sc.IgnoreNoPartition = stmt.IgnoreErr - sc.ErrAutoincReadFailedAsWarning = stmt.IgnoreErr + if stmt.IgnoreErr { + errLevels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelWarn + } sc.DividedByZeroAsWarning = !vars.StrictSQLMode || stmt.IgnoreErr sc.Priority = stmt.Priority + sc.SetErrLevels(errLevels) sc.SetTypeFlags(sc.TypeFlags(). WithTruncateAsWarning(!vars.StrictSQLMode || stmt.IgnoreErr). WithIgnoreInvalidDateErr(vars.SQLMode.HasAllowInvalidDatesMode()). diff --git a/pkg/executor/insert.go b/pkg/executor/insert.go index 1c49038910..64a01990c7 100644 --- a/pkg/executor/insert.go +++ b/pkg/executor/insert.go @@ -308,11 +308,9 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error { err := insertRows(ctx, e) if err != nil { terr, ok := errors.Cause(err).(*terror.Error) - if ok && len(e.OnDuplicate) == 0 && - e.Ctx().GetSessionVars().StmtCtx.ErrAutoincReadFailedAsWarning && - terr.Code() == errno.ErrAutoincReadFailed { - e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err) - return nil + if ok && len(e.OnDuplicate) == 0 && terr.Code() == errno.ErrAutoincReadFailed { + ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx() + return ec.HandleError(err) } return err } diff --git a/pkg/sessionctx/stmtctx/BUILD.bazel b/pkg/sessionctx/stmtctx/BUILD.bazel index c8726e67cd..a9f8c30b6c 100644 --- a/pkg/sessionctx/stmtctx/BUILD.bazel +++ b/pkg/sessionctx/stmtctx/BUILD.bazel @@ -14,6 +14,7 @@ go_library( "//pkg/parser/mysql", "//pkg/parser/terror", "//pkg/types", + "//pkg/util/context", "//pkg/util/disk", "//pkg/util/execdetails", "//pkg/util/intest", @@ -43,6 +44,7 @@ go_test( flaky = True, shard_count = 12, deps = [ + "//pkg/errctx", "//pkg/kv", "//pkg/sessionctx/variable", "//pkg/testkit", diff --git a/pkg/sessionctx/stmtctx/stmtctx.go b/pkg/sessionctx/stmtctx/stmtctx.go index 3a586a5ecc..1ec46b1d4f 100644 --- a/pkg/sessionctx/stmtctx/stmtctx.go +++ b/pkg/sessionctx/stmtctx/stmtctx.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/types" + contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/disk" "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/intest" @@ -171,32 +172,31 @@ type StatementContext struct { // IsDDLJobInQueue is used to mark whether the DDL job is put into the queue. // If IsDDLJobInQueue is true, it means the DDL job is in the queue of storage, and it can be handled by the DDL worker. - IsDDLJobInQueue bool - DDLJobID int64 - InInsertStmt bool - InUpdateStmt bool - InDeleteStmt bool - InSelectStmt bool - InLoadDataStmt bool - InExplainStmt bool - InExplainAnalyzeStmt bool - ExplainFormat string - InCreateOrAlterStmt bool - InSetSessionStatesStmt bool - InPreparedPlanBuilding bool - DupKeyAsWarning bool - BadNullAsWarning bool - DividedByZeroAsWarning bool - ErrAutoincReadFailedAsWarning bool - InShowWarning bool - UseCache bool - ForcePlanCache bool // force the optimizer to use plan cache even if there is risky optimization, see #49736. - CacheType PlanCacheType - BatchCheck bool - InNullRejectCheck bool - IgnoreNoPartition bool - IgnoreExplainIDSuffix bool - MultiSchemaInfo *model.MultiSchemaInfo + IsDDLJobInQueue bool + DDLJobID int64 + InInsertStmt bool + InUpdateStmt bool + InDeleteStmt bool + InSelectStmt bool + InLoadDataStmt bool + InExplainStmt bool + InExplainAnalyzeStmt bool + ExplainFormat string + InCreateOrAlterStmt bool + InSetSessionStatesStmt bool + InPreparedPlanBuilding bool + DupKeyAsWarning bool + BadNullAsWarning bool + DividedByZeroAsWarning bool + InShowWarning bool + UseCache bool + ForcePlanCache bool // force the optimizer to use plan cache even if there is risky optimization, see #49736. + CacheType PlanCacheType + BatchCheck bool + InNullRejectCheck bool + IgnoreNoPartition bool + IgnoreExplainIDSuffix bool + MultiSchemaInfo *model.MultiSchemaInfo // If the select statement was like 'select * from t as of timestamp ...' or in a stale read transaction // or is affected by the tidb_read_staleness session variable, then the statement will be makred as isStaleness // in stmtCtx @@ -443,17 +443,17 @@ func NewStmtCtxWithTimeZone(tz *time.Location) *StatementContext { ctxID: stmtCtxIDGenerator.Add(1), } sc.typeCtx = types.NewContext(types.DefaultStmtFlags, tz, sc) - sc.initErrCtx() + sc.errCtx = newErrCtx(sc.typeCtx, errctx.LevelMap{}, sc) return sc } // Reset resets a statement context func (sc *StatementContext) Reset() { *sc = StatementContext{ - ctxID: stmtCtxIDGenerator.Add(1), - typeCtx: types.NewContext(types.DefaultStmtFlags, time.UTC, sc), + ctxID: stmtCtxIDGenerator.Add(1), } - sc.initErrCtx() + sc.typeCtx = types.NewContext(types.DefaultStmtFlags, time.UTC, sc) + sc.errCtx = newErrCtx(sc.typeCtx, errctx.LevelMap{}, sc) } // CtxID returns the context id of the statement @@ -482,22 +482,17 @@ func (sc *StatementContext) TypeCtx() types.Context { return sc.typeCtx } -func (sc *StatementContext) initErrCtx() { - ctx := errctx.NewContext(sc) - - if sc.TypeFlags().IgnoreTruncateErr() { - ctx = ctx.WithErrGroupLevel(errctx.ErrGroupTruncate, errctx.LevelIgnore) - } else if sc.TypeFlags().TruncateAsWarning() { - ctx = ctx.WithErrGroupLevel(errctx.ErrGroupTruncate, errctx.LevelWarn) - } - sc.errCtx = ctx -} - // ErrCtx returns the error context func (sc *StatementContext) ErrCtx() errctx.Context { return sc.errCtx } +// SetErrLevels sets the error levels for statement +// The argument otherLevels is used to set the error levels except truncate +func (sc *StatementContext) SetErrLevels(otherLevels errctx.LevelMap) { + sc.errCtx = newErrCtx(sc.typeCtx, otherLevels, sc) +} + // TypeFlags returns the type flags func (sc *StatementContext) TypeFlags() types.Flags { return sc.typeCtx.Flags() @@ -506,7 +501,7 @@ func (sc *StatementContext) TypeFlags() types.Flags { // SetTypeFlags sets the type flags func (sc *StatementContext) SetTypeFlags(flags types.Flags) { sc.typeCtx = sc.typeCtx.WithFlags(flags) - sc.initErrCtx() + sc.errCtx = newErrCtx(sc.typeCtx, sc.errCtx.LevelMap(), sc) } // HandleTruncate ignores or returns the error based on the TypeContext inside. @@ -1415,6 +1410,18 @@ func (sc *StatementContext) TypeCtxOrDefault() types.Context { return types.DefaultStmtNoWarningContext } +func newErrCtx(tc types.Context, otherLevels errctx.LevelMap, handler contextutil.WarnHandler) errctx.Context { + l := errctx.LevelError + if flags := tc.Flags(); flags.IgnoreTruncateErr() { + l = errctx.LevelIgnore + } else if flags.TruncateAsWarning() { + l = errctx.LevelWarn + } + + otherLevels[errctx.ErrGroupTruncate] = l + return errctx.NewContextWithLevels(otherLevels, handler) +} + // UsedStatsInfoForTable records stats that are used during query and their information. type UsedStatsInfoForTable struct { Name string diff --git a/pkg/sessionctx/stmtctx/stmtctx_test.go b/pkg/sessionctx/stmtctx/stmtctx_test.go index 29cb662b08..4e0fe05a69 100644 --- a/pkg/sessionctx/stmtctx/stmtctx_test.go +++ b/pkg/sessionctx/stmtctx/stmtctx_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/pkg/errctx" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" @@ -317,6 +318,7 @@ func TestNewStmtCtx(t *testing.T) { require.Equal(t, types.DefaultStmtFlags, sc.TypeFlags()) require.Same(t, time.UTC, sc.TimeZone()) require.Same(t, time.UTC, sc.TimeZone()) + require.Equal(t, errctx.NewContextWithLevels(errctx.LevelMap{}, sc), sc.ErrCtx()) sc.AppendWarning(errors.NewNoStackError("err1")) warnings := sc.GetWarnings() require.Equal(t, 1, len(warnings)) @@ -328,6 +330,7 @@ func TestNewStmtCtx(t *testing.T) { require.Equal(t, types.DefaultStmtFlags, sc.TypeFlags()) require.Same(t, tz, sc.TimeZone()) require.Same(t, tz, sc.TimeZone()) + require.Equal(t, errctx.NewContextWithLevels(errctx.LevelMap{}, sc), sc.ErrCtx()) sc.AppendWarning(errors.NewNoStackError("err2")) warnings = sc.GetWarnings() require.Equal(t, 1, len(warnings)) @@ -347,13 +350,17 @@ func TestSetStmtCtxTypeFlags(t *testing.T) { sc := stmtctx.NewStmtCtx() require.Equal(t, types.DefaultStmtFlags, sc.TypeFlags()) + levels := errctx.LevelMap{} sc.SetTypeFlags(types.FlagAllowNegativeToUnsigned | types.FlagSkipASCIICheck) require.Equal(t, types.FlagAllowNegativeToUnsigned|types.FlagSkipASCIICheck, sc.TypeFlags()) require.Equal(t, sc.TypeFlags(), sc.TypeFlags()) + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) sc.SetTypeFlags(types.FlagSkipASCIICheck | types.FlagSkipUTF8Check | types.FlagTruncateAsWarning) require.Equal(t, types.FlagSkipASCIICheck|types.FlagSkipUTF8Check|types.FlagTruncateAsWarning, sc.TypeFlags()) require.Equal(t, sc.TypeFlags(), sc.TypeFlags()) + levels[errctx.ErrGroupTruncate] = errctx.LevelWarn + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) } func TestResetStmtCtx(t *testing.T) { @@ -362,14 +369,17 @@ func TestResetStmtCtx(t *testing.T) { tz := time.FixedZone("UTC+1", 2*60*60) sc.SetTimeZone(tz) - sc.SetTypeFlags(types.FlagAllowNegativeToUnsigned | types.FlagSkipASCIICheck) + sc.SetTypeFlags(types.FlagIgnoreTruncateErr | types.FlagAllowNegativeToUnsigned | types.FlagSkipASCIICheck) sc.AppendWarning(errors.NewNoStackError("err1")) sc.InRestrictedSQL = true sc.StmtType = "Insert" require.Same(t, tz, sc.TimeZone()) - require.Equal(t, types.FlagAllowNegativeToUnsigned|types.FlagSkipASCIICheck, sc.TypeFlags()) + require.Equal(t, types.FlagIgnoreTruncateErr|types.FlagAllowNegativeToUnsigned|types.FlagSkipASCIICheck, sc.TypeFlags()) require.Equal(t, 1, len(sc.GetWarnings())) + levels := errctx.LevelMap{} + levels[errctx.ErrGroupTruncate] = errctx.LevelIgnore + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) sc.Reset() require.Same(t, time.UTC, sc.TimeZone()) @@ -384,6 +394,8 @@ func TestResetStmtCtx(t *testing.T) { require.Equal(t, 1, len(warnings)) require.Equal(t, stmtctx.WarnLevelWarning, warnings[0].Level) require.Equal(t, "err2", warnings[0].Err.Error()) + levels = errctx.LevelMap{} + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) } func TestStmtCtxID(t *testing.T) { @@ -413,10 +425,27 @@ func TestErrCtx(t *testing.T) { // the default errCtx err := types.ErrTruncated require.Error(t, sc.HandleError(err)) + levels := errctx.LevelMap{} + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) - // reset the types flags will re-initialize the error flag + // set error levels + levels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelIgnore + sc.SetErrLevels(levels) + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) + + // reset the types flags will re-initialize the error flag, but keeps the error levels unchanged except for ErrGroupTruncate sc.SetTypeFlags(types.DefaultStmtFlags | types.FlagTruncateAsWarning) require.NoError(t, sc.HandleError(err)) + levels = errctx.LevelMap{} + levels[errctx.ErrGroupTruncate] = errctx.LevelWarn + levels[errctx.ErrGroupAutoIncReadFailed] = errctx.LevelIgnore + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) + + // SetErrLevels will not affect ErrGroupTruncate + sc.SetErrLevels(errctx.LevelMap{}) + levels = errctx.LevelMap{} + levels[errctx.ErrGroupTruncate] = errctx.LevelWarn + require.Equal(t, errctx.NewContextWithLevels(levels, sc), sc.ErrCtx()) } func BenchmarkErrCtx(b *testing.B) {