// Copyright 2015 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Copyright 2013 The ql Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSES/QL-LICENSE file. package session import ( "bytes" "context" "crypto/tls" "encoding/hex" "encoding/json" stderrs "errors" "fmt" "iter" "math" "math/rand" "regexp" "runtime/pprof" "slices" "strconv" "strings" "sync" "sync/atomic" "time" "github.com/ngaut/pools" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/bindinfo" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/config/kerneltype" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/placement" distsqlctx "github.com/pingcap/tidb/pkg/distsql/context" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/domain/sqlsvrapi" "github.com/pingcap/tidb/pkg/dxf/framework/proto" "github.com/pingcap/tidb/pkg/dxf/framework/scheduler" "github.com/pingcap/tidb/pkg/dxf/framework/taskexecutor" "github.com/pingcap/tidb/pkg/dxf/importinto" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/executor/staticrecordset" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/exprctx" "github.com/pingcap/tidb/pkg/expression/exprstatic" "github.com/pingcap/tidb/pkg/expression/sessionexpr" "github.com/pingcap/tidb/pkg/extension" "github.com/pingcap/tidb/pkg/extension/extensionimpl" "github.com/pingcap/tidb/pkg/infoschema" infoschemactx "github.com/pingcap/tidb/pkg/infoschema/context" "github.com/pingcap/tidb/pkg/infoschema/issyncer" "github.com/pingcap/tidb/pkg/infoschema/validatorapi" "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/meta" "github.com/pingcap/tidb/pkg/meta/metabuild" "github.com/pingcap/tidb/pkg/meta/metadef" "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/owner" "github.com/pingcap/tidb/pkg/param" "github.com/pingcap/tidb/pkg/parser" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/charset" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/parser/terror" plannercore "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/planner/core/base" "github.com/pingcap/tidb/pkg/planner/core/operator/physicalop" "github.com/pingcap/tidb/pkg/planner/core/resolve" planctx "github.com/pingcap/tidb/pkg/planner/planctx" "github.com/pingcap/tidb/pkg/plugin" "github.com/pingcap/tidb/pkg/privilege" "github.com/pingcap/tidb/pkg/privilege/conn" "github.com/pingcap/tidb/pkg/privilege/privileges" "github.com/pingcap/tidb/pkg/session/cursor" session_metrics "github.com/pingcap/tidb/pkg/session/metrics" "github.com/pingcap/tidb/pkg/session/sessionapi" "github.com/pingcap/tidb/pkg/session/sessmgr" "github.com/pingcap/tidb/pkg/session/txninfo" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/stmtctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/sessiontxn" statshandle "github.com/pingcap/tidb/pkg/statistics/handle" "github.com/pingcap/tidb/pkg/statistics/handle/syncload" "github.com/pingcap/tidb/pkg/statistics/handle/usage" "github.com/pingcap/tidb/pkg/statistics/handle/usage/indexusage" kvstore "github.com/pingcap/tidb/pkg/store" storeerr "github.com/pingcap/tidb/pkg/store/driver/error" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/table/tblctx" "github.com/pingcap/tidb/pkg/table/tblsession" "github.com/pingcap/tidb/pkg/table/temptable" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/telemetry" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/collate" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/logutil/consistency" "github.com/pingcap/tidb/pkg/util/memory" parserutil "github.com/pingcap/tidb/pkg/util/parser" rangerctx "github.com/pingcap/tidb/pkg/util/ranger/context" "github.com/pingcap/tidb/pkg/util/redact" sem "github.com/pingcap/tidb/pkg/util/sem/compat" "github.com/pingcap/tidb/pkg/util/sli" "github.com/pingcap/tidb/pkg/util/sqlescape" "github.com/pingcap/tidb/pkg/util/sqlexec" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/pingcap/tidb/pkg/util/timeutil" "github.com/pingcap/tidb/pkg/util/topsql" topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state" "github.com/pingcap/tidb/pkg/util/topsql/stmtstats" "github.com/pingcap/tidb/pkg/util/traceevent" "github.com/pingcap/tidb/pkg/util/tracing" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/trace" "github.com/tikv/client-go/v2/txnkv/transaction" tikvutil "github.com/tikv/client-go/v2/util" rmclient "github.com/tikv/pd/client/resource_group/controller" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) func init() { executor.CreateSession = func(ctx sessionctx.Context) (sessionctx.Context, error) { return CreateSession(ctx.GetStore()) } executor.CloseSession = func(ctx sessionctx.Context) { if se, ok := ctx.(sessionapi.Session); ok { se.Close() } } } var _ sessionapi.Session = (*session)(nil) type stmtRecord struct { st sqlexec.Statement stmtCtx *stmtctx.StatementContext } // StmtHistory holds all histories of statements in a txn. type StmtHistory struct { history []*stmtRecord } // Add appends a stmt to history list. func (h *StmtHistory) Add(st sqlexec.Statement, stmtCtx *stmtctx.StatementContext) { s := &stmtRecord{ st: st, stmtCtx: stmtCtx, } h.history = append(h.history, s) } // Count returns the count of the history. func (h *StmtHistory) Count() int { return len(h.history) } type session struct { // processInfo is used by ShowProcess(), and should be modified atomically. processInfo atomic.Pointer[sessmgr.ProcessInfo] txn LazyTxn mu struct { sync.RWMutex values map[fmt.Stringer]any } currentCtx context.Context // only use for runtime.trace, Please NEVER use it. currentPlan base.Plan // dom is *domain.Domain, use `any` to avoid import cycle. // cross keyspace session doesn't have domain set. dom any // we cannot compare dom == nil, as dom is untyped, golang will always return false. crossKS bool schemaValidator validatorapi.Validator infoCache *infoschema.InfoCache store kv.Storage sessionPlanCache sessionctx.SessionPlanCache sessionVars *variable.SessionVars sessionManager sessmgr.Manager pctx *planContextImpl exprctx *sessionexpr.ExprContext tblctx *tblsession.MutateContext statsCollector *usage.SessionStatsItem // ddlOwnerManager is used in `select tidb_is_ddl_owner()` statement; ddlOwnerManager owner.Manager // lockedTables use to record the table locks hold by the session. lockedTables map[int64]model.TableLockTpInfo // client shared coprocessor client per session client kv.Client mppClient kv.MPPClient // indexUsageCollector collects index usage information. idxUsageCollector *indexusage.SessionIndexUsageCollector functionUsageMu struct { syncutil.RWMutex builtinFunctionUsage telemetry.BuiltinFunctionsUsage } // StmtStats is used to count various indicators of each SQL in this session // at each point in time. These data will be periodically taken away by the // background goroutine. The background goroutine will continue to aggregate // all the local data in each session, and finally report them to the remote // regularly. stmtStats *stmtstats.StatementStats // Used to encode and decode each type of session states. sessionStatesHandlers map[sessionstates.SessionStateType]sessionctx.SessionStatesHandler // Contains a list of sessions used to collect advisory locks. advisoryLocks map[string]*advisoryLock extensions *extension.SessionExtensions sandBoxMode bool cursorTracker cursor.Tracker // Used to wait for all async commit background jobs to finish. commitWaitGroup sync.WaitGroup } // GetTraceCtx returns the trace context of the session. func (s *session) GetTraceCtx() context.Context { return s.currentCtx } // AddTableLock adds table lock to the session lock map. func (s *session) AddTableLock(locks []model.TableLockTpInfo) { for _, l := range locks { // read only lock is session unrelated, skip it when adding lock to session. if l.Tp != ast.TableLockReadOnly { s.lockedTables[l.TableID] = l } } } // ReleaseTableLocks releases table lock in the session lock map. func (s *session) ReleaseTableLocks(locks []model.TableLockTpInfo) { for _, l := range locks { delete(s.lockedTables, l.TableID) } } // ReleaseTableLockByTableIDs releases table lock in the session lock map by table ID. func (s *session) ReleaseTableLockByTableIDs(tableIDs []int64) { for _, tblID := range tableIDs { delete(s.lockedTables, tblID) } } // CheckTableLocked checks the table lock. func (s *session) CheckTableLocked(tblID int64) (bool, ast.TableLockType) { lt, ok := s.lockedTables[tblID] if !ok { return false, ast.TableLockNone } return true, lt.Tp } // GetAllTableLocks gets all table locks table id and db id hold by the session. func (s *session) GetAllTableLocks() []model.TableLockTpInfo { lockTpInfo := make([]model.TableLockTpInfo, 0, len(s.lockedTables)) for _, tl := range s.lockedTables { lockTpInfo = append(lockTpInfo, tl) } return lockTpInfo } // HasLockedTables uses to check whether this session locked any tables. // If so, the session can only visit the table which locked by self. func (s *session) HasLockedTables() bool { b := len(s.lockedTables) > 0 return b } // ReleaseAllTableLocks releases all table locks hold by the session. func (s *session) ReleaseAllTableLocks() { s.lockedTables = make(map[int64]model.TableLockTpInfo) } // IsDDLOwner checks whether this session is DDL owner. func (s *session) IsDDLOwner() bool { return s.ddlOwnerManager.IsOwner() } func (s *session) cleanRetryInfo() { if s.sessionVars.RetryInfo.Retrying { return } retryInfo := s.sessionVars.RetryInfo defer retryInfo.Clean() if len(retryInfo.DroppedPreparedStmtIDs) == 0 { return } planCacheEnabled := s.GetSessionVars().EnablePreparedPlanCache var cacheKey string var err error var preparedObj *plannercore.PlanCacheStmt if planCacheEnabled { firstStmtID := retryInfo.DroppedPreparedStmtIDs[0] if preparedPointer, ok := s.sessionVars.PreparedStmts[firstStmtID]; ok { preparedObj, ok = preparedPointer.(*plannercore.PlanCacheStmt) if ok { cacheKey, _, _, _, err = plannercore.NewPlanCacheKey(s, preparedObj) if err != nil { logutil.Logger(s.currentCtx).Warn("clean cached plan failed", zap.Error(err)) return } } } } for i, stmtID := range retryInfo.DroppedPreparedStmtIDs { if planCacheEnabled { if i > 0 && preparedObj != nil { cacheKey, _, _, _, err = plannercore.NewPlanCacheKey(s, preparedObj) if err != nil { logutil.Logger(s.currentCtx).Warn("clean cached plan failed", zap.Error(err)) return } } if !s.sessionVars.IgnorePreparedCacheCloseStmt { // keep the plan in cache s.GetSessionPlanCache().Delete(cacheKey) } } s.sessionVars.RemovePreparedStmt(stmtID) } } func (s *session) Status() uint16 { return s.sessionVars.Status() } func (s *session) LastInsertID() uint64 { if s.sessionVars.StmtCtx.LastInsertID > 0 { return s.sessionVars.StmtCtx.LastInsertID } return s.sessionVars.StmtCtx.InsertID } func (s *session) LastMessage() string { return s.sessionVars.StmtCtx.GetMessage() } func (s *session) AffectedRows() uint64 { return s.sessionVars.StmtCtx.AffectedRows() } func (s *session) SetClientCapability(capability uint32) { s.sessionVars.ClientCapability = capability } func (s *session) SetConnectionID(connectionID uint64) { s.sessionVars.ConnectionID = connectionID } func (s *session) SetTLSState(tlsState *tls.ConnectionState) { // If user is not connected via TLS, then tlsState == nil. if tlsState != nil { s.sessionVars.TLSConnectionState = tlsState } } func (s *session) SetCompressionAlgorithm(ca int) { s.sessionVars.CompressionAlgorithm = ca } func (s *session) SetCompressionLevel(level int) { s.sessionVars.CompressionLevel = level } func (s *session) SetCommandValue(command byte) { atomic.StoreUint32(&s.sessionVars.CommandValue, uint32(command)) } func (s *session) SetCollation(coID int) error { cs, co, err := charset.GetCharsetInfoByID(coID) if err != nil { return err } // If new collations are enabled, switch to the default // collation if this one is not supported. co = collate.SubstituteMissingCollationToDefault(co) for _, v := range vardef.SetNamesVariables { terror.Log(s.sessionVars.SetSystemVarWithoutValidation(v, cs)) } return s.sessionVars.SetSystemVarWithoutValidation(vardef.CollationConnection, co) } func (s *session) GetSessionPlanCache() sessionctx.SessionPlanCache { // use the prepared plan cache if !s.GetSessionVars().EnablePreparedPlanCache && !s.GetSessionVars().EnableNonPreparedPlanCache { return nil } if s.sessionPlanCache == nil { // lazy construction s.sessionPlanCache = plannercore.NewLRUPlanCache(uint(s.GetSessionVars().SessionPlanCacheSize), vardef.PreparedPlanCacheMemoryGuardRatio.Load(), plannercore.PreparedPlanCacheMaxMemory.Load(), s, false) } return s.sessionPlanCache } func (s *session) SetSessionManager(sm sessmgr.Manager) { s.sessionManager = sm } func (s *session) GetSessionManager() sessmgr.Manager { return s.sessionManager } func (s *session) UpdateColStatsUsage(colStatsUsage iter.Seq[model.TableItemID]) { if s.statsCollector == nil { return } t := time.Now() s.statsCollector.UpdateColStatsUsage(colStatsUsage, t) } // FieldList returns fields list of a table. func (s *session) FieldList(tableName string) ([]*resolve.ResultField, error) { is := s.GetInfoSchema().(infoschema.InfoSchema) dbName := ast.NewCIStr(s.GetSessionVars().CurrentDB) tName := ast.NewCIStr(tableName) pm := privilege.GetPrivilegeManager(s) if pm != nil && s.sessionVars.User != nil { if !pm.RequestVerification(s.sessionVars.ActiveRoles, dbName.O, tName.O, "", mysql.AllPrivMask) { user := s.sessionVars.User u := user.Username h := user.Hostname if len(user.AuthUsername) > 0 && len(user.AuthHostname) > 0 { u = user.AuthUsername h = user.AuthHostname } return nil, plannererrors.ErrTableaccessDenied.GenWithStackByArgs("SELECT", u, h, tableName) } } table, err := is.TableByName(context.Background(), dbName, tName) if err != nil { return nil, err } cols := table.Cols() fields := make([]*resolve.ResultField, 0, len(cols)) for _, col := range table.Cols() { rf := &resolve.ResultField{ ColumnAsName: col.Name, TableAsName: tName, DBName: dbName, Table: table.Meta(), Column: col.ColumnInfo, } fields = append(fields, rf) } return fields, nil } // TxnInfo returns a pointer to a *copy* of the internal TxnInfo, thus is *read only* // Process field may not initialize if this is a session used internally. func (s *session) TxnInfo() *txninfo.TxnInfo { s.txn.mu.RLock() // Copy on read to get a snapshot, this API shouldn't be frequently called. txnInfo := s.txn.mu.TxnInfo s.txn.mu.RUnlock() if txnInfo.StartTS == 0 { return nil } processInfo := s.ShowProcess() if processInfo == nil { return &txnInfo } txnInfo.ProcessInfo = &txninfo.ProcessInfo{ ConnectionID: processInfo.ID, Username: processInfo.User, CurrentDB: processInfo.DB, RelatedTableIDs: make(map[int64]struct{}), } s.GetSessionVars().GetRelatedTableForMDL().Range(func(key, _ any) bool { txnInfo.ProcessInfo.RelatedTableIDs[key.(int64)] = struct{}{} return true }) return &txnInfo } func (s *session) doCommit(ctx context.Context) error { if !s.txn.Valid() { return nil } defer func() { s.txn.changeToInvalid() s.sessionVars.SetInTxn(false) s.sessionVars.ClearDiskFullOpt() }() // check if the transaction is read-only if s.txn.IsReadOnly() { return nil } // check if the cluster is read-only if !s.sessionVars.InRestrictedSQL && (vardef.RestrictedReadOnly.Load() || vardef.VarTiDBSuperReadOnly.Load()) { // It is not internal SQL, and the cluster has one of RestrictedReadOnly or SuperReadOnly // We need to privilege check again: a privilege check occurred during planning, but we need // to prevent the case that a long running auto-commit statement is now trying to commit. pm := privilege.GetPrivilegeManager(s) roles := s.sessionVars.ActiveRoles if pm != nil && !pm.HasExplicitlyGrantedDynamicPrivilege(roles, "RESTRICTED_REPLICA_WRITER_ADMIN", false) { s.RollbackTxn(ctx) return plannererrors.ErrSQLInReadOnlyMode } } err := s.checkPlacementPolicyBeforeCommit(ctx) if err != nil { return err } // mockCommitError and mockGetTSErrorInRetry use to test PR #8743. failpoint.Inject("mockCommitError", func(val failpoint.Value) { if val.(bool) { if _, err := failpoint.Eval("tikvclient/mockCommitErrorOpt"); err == nil { failpoint.Return(kv.ErrTxnRetryable) } } }) sessVars := s.GetSessionVars() var commitTSChecker func(uint64) bool if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 { c := cachedTableRenewLease{tables: tables} now := time.Now() err := c.start(ctx) defer c.stop(ctx) sessVars.StmtCtx.WaitLockLeaseTime += time.Since(now) if err != nil { return errors.Trace(err) } commitTSChecker = c.commitTSCheck } if err = sessiontxn.GetTxnManager(s).SetOptionsBeforeCommit(s.txn.Transaction, commitTSChecker); err != nil { return err } err = s.commitTxnWithTemporaryData(tikvutil.SetSessionID(ctx, sessVars.ConnectionID), &s.txn) if err != nil { err = s.handleAssertionFailure(ctx, err) } return err } type cachedTableRenewLease struct { tables map[int64]any lease []uint64 // Lease for each visited cached tables. exit chan struct{} } func (c *cachedTableRenewLease) start(ctx context.Context) error { c.exit = make(chan struct{}) c.lease = make([]uint64, len(c.tables)) wg := make(chan error, len(c.tables)) ith := 0 for _, raw := range c.tables { tbl := raw.(table.CachedTable) go tbl.WriteLockAndKeepAlive(ctx, c.exit, &c.lease[ith], wg) ith++ } // Wait for all LockForWrite() return, this function can return. var err error for ; ith > 0; ith-- { tmp := <-wg if tmp != nil { err = tmp } } return err } func (c *cachedTableRenewLease) stop(_ context.Context) { close(c.exit) } func (c *cachedTableRenewLease) commitTSCheck(commitTS uint64) bool { for i := range c.lease { lease := atomic.LoadUint64(&c.lease[i]) if commitTS >= lease { // Txn fails to commit because the write lease is expired. return false } } return true } // handleAssertionFailure extracts the possible underlying assertionFailed error, // gets the corresponding MVCC history and logs it. // If it's not an assertion failure, returns the original error. func (s *session) handleAssertionFailure(ctx context.Context, err error) error { var assertionFailure *tikverr.ErrAssertionFailed if !stderrs.As(err, &assertionFailure) { return err } key := assertionFailure.Key newErr := kv.ErrAssertionFailed.GenWithStackByArgs( hex.EncodeToString(key), assertionFailure.Assertion.String(), assertionFailure.StartTs, assertionFailure.ExistingStartTs, assertionFailure.ExistingCommitTs, ) rmode := s.GetSessionVars().EnableRedactLog if rmode == errors.RedactLogEnable { return newErr } var decodeFunc func(kv.Key, *kvrpcpb.MvccGetByKeyResponse, map[string]any) // if it's a record key or an index key, decode it if infoSchema, ok := s.sessionVars.TxnCtx.InfoSchema.(infoschema.InfoSchema); ok && infoSchema != nil && (tablecodec.IsRecordKey(key) || tablecodec.IsIndexKey(key)) { tableOrPartitionID := tablecodec.DecodeTableID(key) tbl, ok := infoSchema.TableByID(ctx, tableOrPartitionID) if !ok { tbl, _, _ = infoSchema.FindTableByPartitionID(tableOrPartitionID) } if tbl == nil { logutil.Logger(ctx).Warn("cannot find table by id", zap.Int64("tableID", tableOrPartitionID), zap.String("key", hex.EncodeToString(key))) return newErr } if tablecodec.IsRecordKey(key) { decodeFunc = consistency.DecodeRowMvccData(tbl.Meta()) } else { tableInfo := tbl.Meta() _, indexID, _, e := tablecodec.DecodeIndexKey(key) if e != nil { logutil.Logger(ctx).Error("assertion failed but cannot decode index key", zap.Error(e)) return newErr } var indexInfo *model.IndexInfo for _, idx := range tableInfo.Indices { if idx.ID == indexID { indexInfo = idx break } } if indexInfo == nil { return newErr } decodeFunc = consistency.DecodeIndexMvccData(indexInfo) } } if store, ok := s.store.(helper.Storage); ok { content := consistency.GetMvccByKey(store, key, decodeFunc) logutil.Logger(ctx).Error("assertion failed", zap.String("message", newErr.Error()), zap.String("mvcc history", redact.String(rmode, content))) } return newErr } func (s *session) commitTxnWithTemporaryData(ctx context.Context, txn kv.Transaction) error { sessVars := s.sessionVars txnTempTables := sessVars.TxnCtx.TemporaryTables if len(txnTempTables) == 0 { failpoint.Inject("mockSleepBeforeTxnCommit", func(v failpoint.Value) { ms := v.(int) time.Sleep(time.Millisecond * time.Duration(ms)) }) return txn.Commit(ctx) } sessionData := sessVars.TemporaryTableData var ( stage kv.StagingHandle localTempTables *infoschema.SessionTables ) if sessVars.LocalTemporaryTables != nil { localTempTables = sessVars.LocalTemporaryTables.(*infoschema.SessionTables) } else { localTempTables = new(infoschema.SessionTables) } defer func() { // stage != kv.InvalidStagingHandle means error occurs, we need to cleanup sessionData if stage != kv.InvalidStagingHandle { sessionData.Cleanup(stage) } }() for tblID, tbl := range txnTempTables { if !tbl.GetModified() { continue } if tbl.GetMeta().TempTableType != model.TempTableLocal { continue } if _, ok := localTempTables.TableByID(tblID); !ok { continue } if stage == kv.InvalidStagingHandle { stage = sessionData.Staging() } tblPrefix := tablecodec.EncodeTablePrefix(tblID) endKey := tablecodec.EncodeTablePrefix(tblID + 1) txnMemBuffer := s.txn.GetMemBuffer() iter, err := txnMemBuffer.Iter(tblPrefix, endKey) if err != nil { return err } for iter.Valid() { key := iter.Key() if !bytes.HasPrefix(key, tblPrefix) { break } value := iter.Value() if len(value) == 0 { err = sessionData.DeleteTableKey(tblID, key) } else { err = sessionData.SetTableKey(tblID, key, iter.Value()) } if err != nil { return err } err = iter.Next() if err != nil { return err } } } err := txn.Commit(ctx) if err != nil { return err } if stage != kv.InvalidStagingHandle { sessionData.Release(stage) stage = kv.InvalidStagingHandle } return nil } // errIsNoisy is used to filter DUPLICATE KEY errors. // These can observed by users in INFORMATION_SCHEMA.CLIENT_ERRORS_SUMMARY_GLOBAL instead. // // The rationale for filtering these errors is because they are "client generated errors". i.e. // of the errors defined in kv/error.go, these look to be clearly related to a client-inflicted issue, // and the server is only responsible for handling the error correctly. It does not need to log. func errIsNoisy(err error) bool { if kv.ErrKeyExists.Equal(err) { return true } if storeerr.ErrLockAcquireFailAndNoWaitSet.Equal(err) { return true } return false } func (s *session) doCommitWithRetry(ctx context.Context) error { defer func() { s.GetSessionVars().SetTxnIsolationLevelOneShotStateForNextTxn() s.txn.changeToInvalid() s.cleanRetryInfo() sessiontxn.GetTxnManager(s).OnTxnEnd() }() if !s.txn.Valid() { // If the transaction is invalid, maybe it has already been rolled back by the client. return nil } isInternalTxn := false if internal := s.txn.GetOption(kv.RequestSourceInternal); internal != nil && internal.(bool) { isInternalTxn = true } var err error txnSize := s.txn.Size() isPessimistic := s.sessionVars.TxnCtx.IsPessimistic isPipelined := s.txn.IsPipelined() r, ctx := tracing.StartRegionEx(ctx, "session.doCommitWithRetry") defer r.End() // Emit txn.commit.start trace event startTS := s.sessionVars.TxnCtx.StartTS if traceevent.IsEnabled(traceevent.TxnLifecycle) { traceevent.TraceEvent(ctx, traceevent.TxnLifecycle, "txn.commit.start", zap.Uint64("start_ts", startTS), zap.Bool("pessimistic", isPessimistic), zap.Bool("pipelined", isPipelined), zap.Int("txn_size", txnSize), zap.Uint64("conn_id", s.sessionVars.ConnectionID), ) } // Defer txn.commit.finish to capture final result defer func() { if traceevent.IsEnabled(traceevent.TxnLifecycle) { fields := []zap.Field{ zap.Uint64("start_ts", startTS), zap.Bool("pessimistic", isPessimistic), zap.Bool("pipelined", isPipelined), zap.Uint64("conn_id", s.sessionVars.ConnectionID), } if s.txn.lastCommitTS > 0 { fields = append(fields, zap.Uint64("commit_ts", s.txn.lastCommitTS)) } if err != nil { fields = append(fields, zap.Error(err)) } traceevent.TraceEvent(ctx, traceevent.TxnLifecycle, "txn.commit.finish", fields...) } }() err = s.doCommit(ctx) if err != nil { // polish the Write Conflict error message newErr := s.tryReplaceWriteConflictError(ctx, err) if newErr != nil { err = newErr } commitRetryLimit := s.sessionVars.RetryLimit if !s.sessionVars.TxnCtx.CouldRetry { commitRetryLimit = 0 } // Don't retry in BatchInsert mode. As a counter-example, insert into t1 select * from t2, // BatchInsert already commit the first batch 1000 rows, then it commit 1000-2000 and retry the statement, // Finally t1 will have more data than t2, with no errors return to user! if s.isTxnRetryableError(err) && !s.sessionVars.BatchInsert && commitRetryLimit > 0 && !isPessimistic && !isPipelined { logutil.Logger(ctx).Warn("sql", zap.String("label", s.GetSQLLabel()), zap.Error(err), zap.String("txn", s.txn.GoString())) // Transactions will retry 2 ~ commitRetryLimit times. // We make larger transactions retry less times to prevent cluster resource outage. txnSizeRate := float64(txnSize) / float64(kv.TxnTotalSizeLimit.Load()) maxRetryCount := commitRetryLimit - int64(float64(commitRetryLimit-1)*txnSizeRate) err = s.retry(ctx, uint(maxRetryCount)) } else if !errIsNoisy(err) { logutil.Logger(ctx).Warn("can not retry txn", zap.String("label", s.GetSQLLabel()), zap.Error(err), zap.Bool("IsBatchInsert", s.sessionVars.BatchInsert), zap.Bool("IsPessimistic", isPessimistic), zap.Bool("InRestrictedSQL", s.sessionVars.InRestrictedSQL), zap.Int64("tidb_retry_limit", s.sessionVars.RetryLimit), zap.Bool("tidb_disable_txn_auto_retry", s.sessionVars.DisableTxnAutoRetry)) } } counter := s.sessionVars.TxnCtx.StatementCount duration := time.Since(s.GetSessionVars().TxnCtx.CreateTime).Seconds() s.recordOnTransactionExecution(err, counter, duration, isInternalTxn) if err != nil { if !errIsNoisy(err) { logutil.Logger(ctx).Warn("commit failed", zap.String("finished txn", s.txn.GoString()), zap.Error(err)) } return err } s.updateStatsDeltaToCollector() return nil } // adds more information about the table in the error message // precondition: oldErr is a 9007:WriteConflict Error func (s *session) tryReplaceWriteConflictError(ctx context.Context, oldErr error) (newErr error) { if !kv.ErrWriteConflict.Equal(oldErr) { return nil } if errors.RedactLogEnabled.Load() == errors.RedactLogEnable { return nil } originErr := errors.Cause(oldErr) inErr, _ := originErr.(*errors.Error) // we don't want to modify the oldErr, so copy the args list oldArgs := inErr.Args() args := slices.Clone(oldArgs) is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema() if is == nil { return nil } newKeyTableField, ok := addTableNameInTableIDField(ctx, args[3], is) if ok { args[3] = newKeyTableField } newPrimaryKeyTableField, ok := addTableNameInTableIDField(ctx, args[5], is) if ok { args[5] = newPrimaryKeyTableField } return kv.ErrWriteConflict.FastGenByArgs(args...) } // precondition: is != nil func addTableNameInTableIDField(ctx context.Context, tableIDField any, is infoschema.InfoSchema) (enhancedMsg string, done bool) { keyTableID, ok := tableIDField.(string) if !ok { return "", false } stringsInTableIDField := strings.Split(keyTableID, "=") if len(stringsInTableIDField) == 0 { return "", false } tableIDStr := stringsInTableIDField[len(stringsInTableIDField)-1] tableID, err := strconv.ParseInt(tableIDStr, 10, 64) if err != nil { return "", false } var tableName string tbl, ok := is.TableByID(ctx, tableID) if !ok { tableName = "unknown" } else { dbInfo, ok := infoschema.SchemaByTable(is, tbl.Meta()) if !ok { tableName = "unknown." + tbl.Meta().Name.String() } else { tableName = dbInfo.Name.String() + "." + tbl.Meta().Name.String() } } enhancedMsg = keyTableID + ", tableName=" + tableName return enhancedMsg, true } func (s *session) updateStatsDeltaToCollector() { mapper := s.GetSessionVars().TxnCtx.TableDeltaMap if s.statsCollector != nil && mapper != nil { for _, item := range mapper { if item.TableID > 0 { s.statsCollector.Update(item.TableID, item.Delta, item.Count) } } } } func (s *session) CommitTxn(ctx context.Context) error { r, ctx := tracing.StartRegionEx(ctx, "session.CommitTxn") defer r.End() s.setLastTxnInfoBeforeTxnEnd() var commitDetail *tikvutil.CommitDetails ctx = context.WithValue(ctx, tikvutil.CommitDetailCtxKey, &commitDetail) err := s.doCommitWithRetry(ctx) if commitDetail != nil { s.sessionVars.StmtCtx.MergeExecDetails(commitDetail) } if err == nil && s.txn.lastCommitTS > 0 { // lastCommitTS could be the same, e.g. when the txn is considered readonly if s.txn.lastCommitTS < s.sessionVars.LastCommitTS { logutil.BgLogger().Error("check lastCommitTS failed", zap.Uint64("sessionLastCommitTS", s.sessionVars.LastCommitTS), zap.Uint64("txnLastCommitTS", s.txn.lastCommitTS), zap.String("sql", redact.String(s.sessionVars.EnableRedactLog, s.sessionVars.StmtCtx.OriginalSQL)), ) return fmt.Errorf("txn commit_ts:%d is before session last_commit_ts:%d", s.txn.lastCommitTS, s.sessionVars.LastCommitTS) } s.sessionVars.LastCommitTS = s.txn.lastCommitTS } // record the TTLInsertRows in the metric metrics.TTLInsertRowsCount.Add(float64(s.sessionVars.TxnCtx.InsertTTLRowsCount)) metrics.DDLCommitTempIndexWrite(s.sessionVars.ConnectionID) failpoint.Inject("keepHistory", func(val failpoint.Value) { if val.(bool) { failpoint.Return(err) } }) s.sessionVars.TxnCtx.Cleanup() s.sessionVars.CleanupTxnReadTSIfUsed() return err } func (s *session) RollbackTxn(ctx context.Context) { r, ctx := tracing.StartRegionEx(ctx, "session.RollbackTxn") defer r.End() // Emit txn.rollback trace event if traceevent.IsEnabled(traceevent.TxnLifecycle) { startTS := s.sessionVars.TxnCtx.StartTS stmtCount := uint64(s.sessionVars.TxnCtx.StatementCount) traceevent.TraceEvent(ctx, traceevent.TxnLifecycle, "txn.rollback", zap.Uint64("start_ts", startTS), zap.Uint64("stmt_count", stmtCount), ) } s.setLastTxnInfoBeforeTxnEnd() if s.txn.Valid() { terror.Log(s.txn.Rollback()) } if ctx.Value(inCloseSession{}) == nil { s.cleanRetryInfo() } s.txn.changeToInvalid() s.sessionVars.TxnCtx.Cleanup() s.sessionVars.CleanupTxnReadTSIfUsed() s.sessionVars.SetInTxn(false) sessiontxn.GetTxnManager(s).OnTxnEnd() metrics.DDLRollbackTempIndexWrite(s.sessionVars.ConnectionID) } // setLastTxnInfoBeforeTxnEnd sets the @@last_txn_info variable before commit/rollback the transaction. // The `LastTxnInfo` updated with a JSON string that contains start_ts, for_update_ts, etc. // The `LastTxnInfo` is updated without the `commit_ts` fields because it is unknown // until the commit is done (or do not need to commit for readonly or a rollback transaction). // The non-readonly transaction will overwrite the `LastTxnInfo` again after commit to update the `commit_ts` field. func (s *session) setLastTxnInfoBeforeTxnEnd() { txnCtx := s.GetSessionVars().TxnCtx if txnCtx.StartTS == 0 { // If the txn is not active, for example, executing "SELECT 1", skip setting the last txn info. return } lastTxnInfo, err := json.Marshal(transaction.TxnInfo{ TxnScope: txnCtx.TxnScope, StartTS: txnCtx.StartTS, }) terror.Log(err) s.GetSessionVars().LastTxnInfo = string(lastTxnInfo) } func (s *session) GetClient() kv.Client { return s.client } func (s *session) GetMPPClient() kv.MPPClient { return s.mppClient } func (s *session) String() string { // TODO: how to print binded context in values appropriately? sessVars := s.sessionVars data := map[string]any{ "id": sessVars.ConnectionID, "user": sessVars.User, "currDBName": sessVars.CurrentDB, "status": sessVars.Status(), "strictMode": sessVars.SQLMode.HasStrictMode(), } if s.txn.Valid() { // if txn is committed or rolled back, txn is nil. data["txn"] = s.txn.String() } if sessVars.SnapshotTS != 0 { data["snapshotTS"] = sessVars.SnapshotTS } if sessVars.StmtCtx.LastInsertID > 0 { data["lastInsertID"] = sessVars.StmtCtx.LastInsertID } if len(sessVars.PreparedStmts) > 0 { data["preparedStmtCount"] = len(sessVars.PreparedStmts) } b, err := json.MarshalIndent(data, "", " ") terror.Log(errors.Trace(err)) return string(b) } const sqlLogMaxLen = 1024 // SchemaChangedWithoutRetry is used for testing. var SchemaChangedWithoutRetry uint32 func (s *session) GetSQLLabel() string { if s.sessionVars.InRestrictedSQL { return metrics.LblInternal } return metrics.LblGeneral } func (s *session) isInternal() bool { return s.sessionVars.InRestrictedSQL } func (*session) isTxnRetryableError(err error) bool { if atomic.LoadUint32(&SchemaChangedWithoutRetry) == 1 { return kv.IsTxnRetryableError(err) } return kv.IsTxnRetryableError(err) || domain.ErrInfoSchemaChanged.Equal(err) } func isEndTxnStmt(stmt ast.StmtNode, vars *variable.SessionVars) (bool, error) { switch n := stmt.(type) { case *ast.RollbackStmt, *ast.CommitStmt: return true, nil case *ast.ExecuteStmt: ps, err := plannercore.GetPreparedStmt(n, vars) if err != nil { return false, err } return isEndTxnStmt(ps.PreparedAst.Stmt, vars) } return false, nil } func (s *session) checkTxnAborted(stmt sqlexec.Statement) error { if atomic.LoadUint32(&s.GetSessionVars().TxnCtx.LockExpire) == 0 { return nil } // If the transaction is aborted, the following statements do not need to execute, except `commit` and `rollback`, // because they are used to finish the aborted transaction. if ok, err := isEndTxnStmt(stmt.(*executor.ExecStmt).StmtNode, s.sessionVars); err == nil && ok { return nil } else if err != nil { return err } return kv.ErrLockExpire } func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { var retryCnt uint defer func() { s.sessionVars.RetryInfo.Retrying = false // retryCnt only increments on retryable error, so +1 here. if s.sessionVars.InRestrictedSQL { session_metrics.TransactionRetryInternal.Observe(float64(retryCnt + 1)) } else { session_metrics.TransactionRetryGeneral.Observe(float64(retryCnt + 1)) } s.sessionVars.SetInTxn(false) if err != nil { s.RollbackTxn(ctx) } s.txn.changeToInvalid() }() connID := s.sessionVars.ConnectionID s.sessionVars.RetryInfo.Retrying = true if atomic.LoadUint32(&s.sessionVars.TxnCtx.ForUpdate) == 1 { err = ErrForUpdateCantRetry.GenWithStackByArgs(connID) return err } nh := GetHistory(s) var schemaVersion int64 sessVars := s.GetSessionVars() orgStartTS := sessVars.TxnCtx.StartTS label := s.GetSQLLabel() for { if err = s.PrepareTxnCtx(ctx, nil); err != nil { return err } s.sessionVars.RetryInfo.ResetOffset() for i, sr := range nh.history { st := sr.st s.sessionVars.StmtCtx = sr.stmtCtx s.sessionVars.StmtCtx.CTEStorageMap = map[int]*executor.CTEStorages{} s.sessionVars.StmtCtx.ResetForRetry() s.sessionVars.PlanCacheParams.Reset() schemaVersion, err = st.RebuildPlan(ctx) if err != nil { return err } if retryCnt == 0 { // We do not have to log the query every time. // We print the queries at the first try only. sql := sqlForLog(st.GetTextToLog(false)) if sessVars.EnableRedactLog != errors.RedactLogEnable { sql += redact.String(sessVars.EnableRedactLog, sessVars.PlanCacheParams.String()) } logutil.Logger(ctx).Warn("retrying", zap.Int64("schemaVersion", schemaVersion), zap.Uint("retryCnt", retryCnt), zap.Int("queryNum", i), zap.String("sql", sql)) } else { logutil.Logger(ctx).Warn("retrying", zap.Int64("schemaVersion", schemaVersion), zap.Uint("retryCnt", retryCnt), zap.Int("queryNum", i)) } _, digest := s.sessionVars.StmtCtx.SQLDigest() s.txn.onStmtStart(digest.String()) if err = sessiontxn.GetTxnManager(s).OnStmtStart(ctx, st.GetStmtNode()); err == nil { _, err = st.Exec(ctx) } s.txn.onStmtEnd() if err != nil { s.StmtRollback(ctx, false) break } s.StmtCommit(ctx) } logutil.Logger(ctx).Warn("transaction association", zap.Uint64("retrying txnStartTS", s.GetSessionVars().TxnCtx.StartTS), zap.Uint64("original txnStartTS", orgStartTS)) failpoint.Inject("preCommitHook", func() { hook, ok := ctx.Value("__preCommitHook").(func()) if ok { hook() } }) if err == nil { err = s.doCommit(ctx) if err == nil { break } } if !s.isTxnRetryableError(err) { logutil.Logger(ctx).Warn("sql", zap.String("label", label), zap.Stringer("session", s), zap.Error(err)) metrics.SessionRetryErrorCounter.WithLabelValues(label, metrics.LblUnretryable).Inc() return err } retryCnt++ if retryCnt >= maxCnt { logutil.Logger(ctx).Warn("sql", zap.String("label", label), zap.Uint("retry reached max count", retryCnt)) metrics.SessionRetryErrorCounter.WithLabelValues(label, metrics.LblReachMax).Inc() return err } logutil.Logger(ctx).Warn("sql", zap.String("label", label), zap.Error(err), zap.String("txn", s.txn.GoString())) kv.BackOff(retryCnt) s.txn.changeToInvalid() s.sessionVars.SetInTxn(false) } return err } func sqlForLog(sql string) string { if len(sql) > sqlLogMaxLen { sql = sql[:sqlLogMaxLen] + fmt.Sprintf("(len:%d)", len(sql)) } return executor.QueryReplacer.Replace(sql) } func (s *session) sysSessionPool() util.SessionPool { return domain.GetDomain(s).SysSessionPool() } func getSessionFactory(store kv.Storage) pools.Factory { facWithDom := getSessionFactoryInternal(store, func(store kv.Storage, _ *domain.Domain) (*session, error) { return createSession(store) }) return func() (pools.Resource, error) { return facWithDom(nil) } } func getSessionFactoryWithDom(store kv.Storage) func(*domain.Domain) (pools.Resource, error) { return getSessionFactoryInternal(store, CreateSessionWithDomain) } func getCrossKSSessionFactory(currKSStore kv.Storage, targetKS string, schemaValidator validatorapi.Validator) pools.Factory { facWithDom := getSessionFactoryInternal(currKSStore, func(store kv.Storage, _ *domain.Domain) (*session, error) { return createCrossKSSession(store, targetKS, schemaValidator) }) return func() (pools.Resource, error) { return facWithDom(nil) } } func getSessionFactoryInternal(store kv.Storage, createSessFn func(store kv.Storage, dom *domain.Domain) (*session, error)) func(*domain.Domain) (pools.Resource, error) { return func(dom *domain.Domain) (pools.Resource, error) { se, err := createSessFn(store, dom) if err != nil { return nil, err } err = se.sessionVars.SetSystemVar(vardef.AutoCommit, "1") if err != nil { return nil, err } err = se.sessionVars.SetSystemVar(vardef.MaxExecutionTime, "0") if err != nil { return nil, errors.Trace(err) } err = se.sessionVars.SetSystemVar(vardef.MaxAllowedPacket, strconv.FormatUint(vardef.DefMaxAllowedPacket, 10)) if err != nil { return nil, errors.Trace(err) } err = se.sessionVars.SetSystemVar(vardef.TiDBConstraintCheckInPlacePessimistic, vardef.On) if err != nil { return nil, errors.Trace(err) } se.sessionVars.CommonGlobalLoaded = true se.sessionVars.InRestrictedSQL = true // Internal session uses default format to prevent memory leak problem. se.sessionVars.EnableChunkRPC = false return se, nil } } func drainRecordSet(ctx context.Context, se *session, rs sqlexec.RecordSet, alloc chunk.Allocator) ([]chunk.Row, error) { var rows []chunk.Row var req *chunk.Chunk req = rs.NewChunk(alloc) for { err := rs.Next(ctx, req) if err != nil || req.NumRows() == 0 { return rows, err } iter := chunk.NewIterator4Chunk(req) for r := iter.Begin(); r != iter.End(); r = iter.Next() { rows = append(rows, r) } req = chunk.Renew(req, se.sessionVars.MaxChunkSize) } } // getTableValue executes restricted sql and the result is one column. // It returns a string value. func (s *session) getTableValue(ctx context.Context, tblName string, varName string) (string, error) { if ctx.Value(kv.RequestSourceKey) == nil { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnSysVar) } rows, fields, err := s.ExecRestrictedSQL(ctx, nil, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?", mysql.SystemDB, tblName, varName) if err != nil { return "", err } if len(rows) == 0 { return "", errResultIsEmpty } d := rows[0].GetDatum(0, &fields[0].Column.FieldType) value, err := d.ToString() if err != nil { return "", err } return value, nil } // replaceGlobalVariablesTableValue executes restricted sql updates the variable value // It will then notify the etcd channel that the value has changed. func (s *session) replaceGlobalVariablesTableValue(ctx context.Context, varName, val string, updateLocal bool) error { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnSysVar) _, _, err := s.ExecRestrictedSQL(ctx, nil, `REPLACE INTO %n.%n (variable_name, variable_value) VALUES (%?, %?)`, mysql.SystemDB, mysql.GlobalVariablesTable, varName, val) if err != nil { return err } domain.GetDomain(s).NotifyUpdateSysVarCache(updateLocal) return err } // GetGlobalSysVar implements GlobalVarAccessor.GetGlobalSysVar interface. func (s *session) GetGlobalSysVar(name string) (string, error) { if s.Value(sessionctx.Initing) != nil { // When running bootstrap or upgrade, we should not access global storage. return "", nil } sv := variable.GetSysVar(name) if sv == nil { // It might be a recently unregistered sysvar. We should return unknown // since GetSysVar is the canonical version, but we can update the cache // so the next request doesn't attempt to load this. logutil.BgLogger().Info("sysvar does not exist. sysvar cache may be stale", zap.String("name", name)) return "", variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } sysVar, err := domain.GetDomain(s).GetGlobalVar(name) if err != nil { // The sysvar exists, but there is no cache entry yet. // This might be because the sysvar was only recently registered. // In which case it is safe to return the default, but we can also // update the cache for the future. logutil.BgLogger().Info("sysvar not in cache yet. sysvar cache may be stale", zap.String("name", name)) sysVar, err = s.getTableValue(context.TODO(), mysql.GlobalVariablesTable, name) if err != nil { return sv.Value, nil } } // It might have been written from an earlier TiDB version, so we should do type validation // See https://github.com/pingcap/tidb/issues/30255 for why we don't do full validation. // If validation fails, we should return the default value: // See: https://github.com/pingcap/tidb/pull/31566 sysVar, err = sv.ValidateFromType(s.GetSessionVars(), sysVar, vardef.ScopeGlobal) if err != nil { return sv.Value, nil } return sysVar, nil } // SetGlobalSysVar implements GlobalVarAccessor.SetGlobalSysVar interface. func (s *session) SetGlobalSysVar(ctx context.Context, name string, value string) (err error) { sv := variable.GetSysVar(name) if sv == nil { return variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } if value, err = sv.Validate(s.sessionVars, value, vardef.ScopeGlobal); err != nil { return err } if err = sv.SetGlobalFromHook(ctx, s.sessionVars, value, false); err != nil { return err } if sv.GlobalConfigName != "" { domain.GetDomain(s).NotifyGlobalConfigChange(sv.GlobalConfigName, variable.OnOffToTrueFalse(value)) } return s.replaceGlobalVariablesTableValue(context.TODO(), sv.Name, value, true) } // SetGlobalSysVarOnly updates the sysvar, but does not call the validation function or update aliases. // This is helpful to prevent duplicate warnings being appended from aliases, or recursion. // updateLocal indicates whether to rebuild the local SysVar Cache. This is helpful to prevent recursion. func (s *session) SetGlobalSysVarOnly(ctx context.Context, name string, value string, updateLocal bool) (err error) { sv := variable.GetSysVar(name) if sv == nil { return variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } if err = sv.SetGlobalFromHook(ctx, s.sessionVars, value, true); err != nil { return err } return s.replaceGlobalVariablesTableValue(ctx, sv.Name, value, updateLocal) } // SetInstanceSysVar implements InstanceVarAccessor.SetInstanceSysVar interface. func (s *session) SetInstanceSysVar(ctx context.Context, name string, value string) (err error) { sv := variable.GetSysVar(name) if sv == nil { return variable.ErrUnknownSystemVar.GenWithStackByArgs(name) } if value, err = sv.Validate(s.sessionVars, value, vardef.ScopeInstance); err != nil { return err } return sv.SetGlobalFromHook(ctx, s.sessionVars, value, false) } // SetTiDBTableValue implements GlobalVarAccessor.SetTiDBTableValue interface. func (s *session) SetTiDBTableValue(name, value, comment string) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnSysVar) _, _, err := s.ExecRestrictedSQL(ctx, nil, `REPLACE INTO mysql.tidb (variable_name, variable_value, comment) VALUES (%?, %?, %?)`, name, value, comment) return err } // GetTiDBTableValue implements GlobalVarAccessor.GetTiDBTableValue interface. func (s *session) GetTiDBTableValue(name string) (string, error) { return s.getTableValue(context.TODO(), mysql.TiDBTable, name) } var _ sqlexec.SQLParser = &session{} const ( coreSQLToken = 1 << iota bypassSQLToken isSelectSQLToken defOOMRiskCheckDur = time.Millisecond * 100 // 100ms: sleep duration when mem-arbitrator is at memory risk defSuffixSplitDot = ", " defSuffixParseSQL = defSuffixSplitDot + "path=ParseSQL" defSuffixCompilePlan = defSuffixSplitDot + "path=CompilePlan" // mem quota for compiling plan per token. // 1. prepare tpc-c // 2. run tpc-c workload with multiple threads for a few minutes // 3. observe the memory consumption of TiDB instance without copr-cache // 4. calculate the average memory consumption of compiling plan per token: // executor.(*Compiler).Compile / threads / avg token count per SQL / 2 * 1.2(more 20%) defCompilePlanQuotaPerToken = 63091 * 12 / 10 // mem quota for parsing SQL per token (similar method as above) // session.(*session).ParseSQL / threads / avg token count per SQL / 2 * 1.2(more 20%) defParseSQLQuotaPerToken = 12036 * 12 / 10 ) var keySQLToken = map[string]int{ "select": isSelectSQLToken, "from": coreSQLToken, "insert": coreSQLToken, "update": coreSQLToken, "delete": coreSQLToken, "replace": coreSQLToken, "analyze": coreSQLToken, "execute": coreSQLToken, // ignore `prepare` statement because its content will be parsed later "explain": bypassSQLToken, "desc": bypassSQLToken} // approximate memory quota related token count for parsing a SQL statement which covers most DML statements // 1. ignore comments // 2. count keywords, identifiers, numbers, "?", string/identifier literals as one token // 3. if the SQL has `select` clause, it must have `from` clause: ignore SQL like `select expr()` or `select @@var` // 4. return 0 if the SQL has NO core token (e.g. `set`, `use`, `begin`, `commit`, `rollback`, etc) func approxParseSQLTokenCnt(sql string) (tokenCnt int64) { f := false buffer := struct { d [10]byte n int }{} hitCoreToken := false hasSelect := false for i := 0; i < len(sql); i++ { c := sql[i] if 'A' <= c && c <= 'Z' { c += 'a' - 'A' } if 'a' <= c && c <= 'z' || '0' <= c && c <= '9' || c == '_' { f = true if !hitCoreToken { if buffer.n < len(buffer.d) { buffer.d[buffer.n] = c buffer.n++ } } continue } if f { f = false tokenCnt++ if !hitCoreToken { token := keySQLToken[string(buffer.d[:buffer.n])] if token&isSelectSQLToken > 0 { hasSelect = true } else if token&coreSQLToken > 0 { hitCoreToken = true } else if token&bypassSQLToken == 0 { if !hasSelect { return 0 } // expect `from` after `select` } buffer.n = 0 } } if sql[i] == '/' && i+1 < len(sql) && sql[i+1] == '*' { i += 2 // skip "/*" for i+1 < len(sql) && !(sql[i] == '*' && sql[i+1] == '/') { i++ } i++ // skip "*/" continue } if sql[i] == '-' && i+1 < len(sql) && sql[i+1] == '-' { i += 2 // skip "--" for i < len(sql) && sql[i] != '\n' { i++ } continue } if sql[i] == '#' { i++ // skip "#" for i < len(sql) && sql[i] != '\n' { i++ } continue } if sql[i] == '"' || sql[i] == '\'' { quote := sql[i] i++ // skip quote for i < len(sql) && sql[i] != quote { if sql[i] == '\\' && i+1 < len(sql) { i++ // skip escape character } i++ } tokenCnt++ continue } if sql[i] == '`' { i++ // skip "`" for i < len(sql) && sql[i] != '`' { if sql[i] == '\\' && i+1 < len(sql) { i++ // skip escape character } i++ } tokenCnt++ continue } if sql[i] == '?' { tokenCnt++ continue } } if f { tokenCnt++ } if !hitCoreToken { return 0 } return } // approximate memory quota related token count for compiling plan of a `Normalized` SQL statement // if the SQL has `select` clause, it must have `from` clause. func approxCompilePlanTokenCnt(sql string, hasSelect bool) (tokenCnt int64) { const tokenFrom = "from" const lenTokenFrom = len(tokenFrom) n := 0 hasSelectFrom := false for i, c := range sql { if 'a' <= c && c <= 'z' || '0' <= c && c <= '9' || c == '_' || c == '`' || c == '.' { n++ continue } if n > 0 { tokenCnt++ if hasSelect && !hasSelectFrom && n == lenTokenFrom && sql[i-lenTokenFrom:i] == tokenFrom { hasSelectFrom = true } n = 0 } if c == '?' { tokenCnt++ continue } } if n > 0 { tokenCnt++ } if hasSelect && !hasSelectFrom { return 0 } return } // approximate memory quota for parsing SQL func approxParseSQLMemQuota(sql string) int64 { tokenCnt := approxParseSQLTokenCnt(sql) return tokenCnt * defParseSQLQuotaPerToken } // approximate memory quota for compiling plan of a `Normalized` SQL statement func approxCompilePlanMemQuota(sql string, hasSelect bool) int64 { tokenCnt := approxCompilePlanTokenCnt(sql, hasSelect) return tokenCnt * defCompilePlanQuotaPerToken } func (s *session) ParseSQL(ctx context.Context, sql string, params ...parser.ParseParam) ([]ast.StmtNode, []error, error) { globalMemArbitrator := memory.GlobalMemArbitrator() arbitratorMode := globalMemArbitrator.WorkMode() execUseArbitrator := false parseSQLMemQuota := int64(0) if globalMemArbitrator != nil && s.sessionVars.ConnectionID != 0 { if s.sessionVars.MemArbitrator.WaitAverse != variable.MemArbitratorNolimit { parseSQLMemQuota = approxParseSQLMemQuota(sql) execUseArbitrator = parseSQLMemQuota > 0 } } if execUseArbitrator { uid := s.sessionVars.ConnectionID intoErrBeforeExec := func() error { if s.sessionVars.MemArbitrator.WaitAverse == variable.MemArbitratorWaitAverseEnable { metrics.GlobalMemArbitratorSubTasks.CancelWaitAverseParse.Inc() return exeerrors.ErrQueryExecStopped.GenWithStackByArgs(memory.ArbitratorWaitAverseCancel.String()+defSuffixParseSQL, uid) } if arbitratorMode == memory.ArbitratorModeStandard { metrics.GlobalMemArbitratorSubTasks.CancelStandardModeParse.Inc() return exeerrors.ErrQueryExecStopped.GenWithStackByArgs(memory.ArbitratorStandardCancel.String()+defSuffixParseSQL, uid) } return nil } if globalMemArbitrator.AtMemRisk() { if err := intoErrBeforeExec(); err != nil { return nil, nil, err } for globalMemArbitrator.AtMemRisk() { if globalMemArbitrator.AtOOMRisk() { metrics.GlobalMemArbitratorSubTasks.ForceKillParse.Inc() return nil, nil, exeerrors.ErrQueryExecStopped.GenWithStackByArgs(memory.ArbitratorOOMRiskKill.String()+defSuffixParseSQL, uid) } time.Sleep(defOOMRiskCheckDur) } } arbitratorOutOfQuota := !globalMemArbitrator.ConsumeQuotaFromAwaitFreePool(uid, parseSQLMemQuota) defer globalMemArbitrator.ConsumeQuotaFromAwaitFreePool(uid, -parseSQLMemQuota) if arbitratorOutOfQuota { // for SQL which needs to be controlled by mem-arbitrator if err := intoErrBeforeExec(); err != nil { return nil, nil, err } } } defer tracing.StartRegion(ctx, "ParseSQL").End() p := parserutil.GetParser() defer func() { parserutil.DestroyParser(p) }() sqlMode := s.sessionVars.SQLMode if s.isInternal() { sqlMode = mysql.DelSQLMode(sqlMode, mysql.ModeNoBackslashEscapes) } p.SetSQLMode(sqlMode) p.SetParserConfig(s.sessionVars.BuildParserConfig()) tmp, warn, err := p.ParseSQL(sql, params...) // The []ast.StmtNode is referenced by the parser, to reuse the parser, make a copy of the result. res := slices.Clone(tmp) return res, warn, err } func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) { // If command == mysql.ComSleep, it means the SQL execution is finished. The processinfo is reset to SLEEP. // If the SQL finished and the session is not in transaction, the current start timestamp need to reset to 0. // Otherwise, it should be set to the transaction start timestamp. // Why not reset the transaction start timestamp to 0 when transaction committed? // Because the select statement and other statements need this timestamp to read data, // after the transaction is committed. e.g. SHOW MASTER STATUS; var curTxnStartTS uint64 var curTxnCreateTime time.Time if command != mysql.ComSleep || s.GetSessionVars().InTxn() { curTxnStartTS = s.sessionVars.TxnCtx.StartTS curTxnCreateTime = s.sessionVars.TxnCtx.CreateTime // For stale read and autocommit path, the `TxnCtx.StartTS` is 0. if curTxnStartTS == 0 { curTxnStartTS = s.sessionVars.TxnCtx.StaleReadTs } } // Set curTxnStartTS to SnapshotTS directly when the session is trying to historic read. // It will avoid the session meet GC lifetime too short error. if s.GetSessionVars().SnapshotTS != 0 { curTxnStartTS = s.GetSessionVars().SnapshotTS } p := s.currentPlan if explain, ok := p.(*plannercore.Explain); ok && explain.Analyze && explain.TargetPlan != nil { p = explain.TargetPlan } sqlCPUUsages := &s.sessionVars.SQLCPUUsages // If command == mysql.ComSleep, it means the SQL execution is finished. Then cpu usages should be nil. if command == mysql.ComSleep { sqlCPUUsages = nil } pi := sessmgr.ProcessInfo{ ID: s.sessionVars.ConnectionID, Port: s.sessionVars.Port, DB: s.sessionVars.CurrentDB, Command: command, Plan: p, BriefBinaryPlan: plannercore.GetBriefBinaryPlan(p), RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl, Time: t, State: s.Status(), Info: sql, CurTxnStartTS: curTxnStartTS, CurTxnCreateTime: curTxnCreateTime, StmtCtx: s.sessionVars.StmtCtx, SQLCPUUsage: sqlCPUUsages, RefCountOfStmtCtx: &s.sessionVars.RefCountOfStmtCtx, MemTracker: s.sessionVars.MemTracker, DiskTracker: s.sessionVars.DiskTracker, RunawayChecker: s.sessionVars.StmtCtx.RunawayChecker, StatsInfo: physicalop.GetStatsInfo, OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(), TableIDs: s.sessionVars.StmtCtx.TableIDs, IndexNames: s.sessionVars.StmtCtx.IndexNames, MaxExecutionTime: maxExecutionTime, RedactSQL: s.sessionVars.EnableRedactLog, ResourceGroupName: s.sessionVars.StmtCtx.ResourceGroupName, SessionAlias: s.sessionVars.SessionAlias, CursorTracker: s.cursorTracker, } oldPi := s.ShowProcess() if p == nil { // Store the last valid plan when the current plan is nil. // This is for `explain for connection` statement has the ability to query the last valid plan. if oldPi != nil && oldPi.Plan != nil && len(oldPi.BriefBinaryPlan) > 0 { pi.Plan = oldPi.Plan pi.RuntimeStatsColl = oldPi.RuntimeStatsColl pi.BriefBinaryPlan = oldPi.BriefBinaryPlan } } // We set process info before building plan, so we extended execution time. if oldPi != nil && oldPi.Info == pi.Info && oldPi.Command == pi.Command { pi.Time = oldPi.Time } if oldPi != nil && oldPi.CurTxnStartTS != 0 && oldPi.CurTxnStartTS == pi.CurTxnStartTS { // Keep the last expensive txn log time, avoid print too many expensive txn logs. pi.ExpensiveTxnLogTime = oldPi.ExpensiveTxnLogTime } _, digest := s.sessionVars.StmtCtx.SQLDigest() pi.Digest = digest.String() // DO NOT reset the currentPlan to nil until this query finishes execution, otherwise reentrant calls // of SetProcessInfo would override Plan and BriefBinaryPlan to nil. if command == mysql.ComSleep { s.currentPlan = nil } if s.sessionVars.User != nil { pi.User = s.sessionVars.User.Username pi.Host = s.sessionVars.User.Hostname } s.processInfo.Store(&pi) } // UpdateProcessInfo updates the session's process info for the running statement. func (s *session) UpdateProcessInfo() { pi := s.ShowProcess() if pi == nil || pi.CurTxnStartTS != 0 { return } // do not modify this two fields in place, see issue: issues/50607 shallowCP := pi.Clone() // Update the current transaction start timestamp. shallowCP.CurTxnStartTS = s.sessionVars.TxnCtx.StartTS if shallowCP.CurTxnStartTS == 0 { // For stale read and autocommit path, the `TxnCtx.StartTS` is 0. shallowCP.CurTxnStartTS = s.sessionVars.TxnCtx.StaleReadTs } shallowCP.CurTxnCreateTime = s.sessionVars.TxnCtx.CreateTime s.processInfo.Store(shallowCP) } func (s *session) getOomAlarmVariablesInfo() sessmgr.OOMAlarmVariablesInfo { return sessmgr.OOMAlarmVariablesInfo{ SessionAnalyzeVersion: s.sessionVars.AnalyzeVersion, SessionEnabledRateLimitAction: s.sessionVars.EnabledRateLimitAction, SessionMemQuotaQuery: s.sessionVars.MemQuotaQuery, } } func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...any) (rs sqlexec.RecordSet, err error) { if sink := tracing.GetSink(ctx); sink == nil { trace := traceevent.NewTrace() ctx = tracing.WithFlightRecorder(ctx, trace) defer trace.DiscardOrFlush(ctx) // A developer debugging event so we can see what trace is missing! if traceevent.IsEnabled(tracing.DevDebug) { traceevent.TraceEvent(ctx, tracing.DevDebug, "ExecuteInternal missing trace ctx", zap.String("sql", sql), zap.Stack("stack")) traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.suspicious_event.dev_debug", func(config *traceevent.DumpTriggerConfig) bool { return config.Event.DevDebug.Type == traceevent.DevDebugTypeExecuteInternalTraceMissing }) } } rs, err = s.executeInternalImpl(ctx, sql, args...) return rs, err } func (s *session) executeInternalImpl(ctx context.Context, sql string, args ...any) (rs sqlexec.RecordSet, err error) { origin := s.sessionVars.InRestrictedSQL s.sessionVars.InRestrictedSQL = true defer func() { s.sessionVars.InRestrictedSQL = origin // Restore the goroutine label by using the original ctx after execution is finished. pprof.SetGoroutineLabels(ctx) }() r, ctx := tracing.StartRegionEx(ctx, "session.ExecuteInternal") defer r.End() logutil.Eventf(ctx, "execute: %s", sql) stmtNode, err := s.ParseWithParams(ctx, sql, args...) if err != nil { return nil, err } rs, err = s.ExecuteStmt(ctx, stmtNode) if err != nil { s.sessionVars.StmtCtx.AppendError(err) } if rs == nil { return nil, err } return rs, err } // Execute is deprecated, we can remove it as soon as plugins are migrated. func (s *session) Execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) { r, ctx := tracing.StartRegionEx(ctx, "session.Execute") defer r.End() logutil.Eventf(ctx, "execute: %s", sql) stmtNodes, err := s.Parse(ctx, sql) if err != nil { return nil, err } if len(stmtNodes) != 1 { return nil, errors.New("Execute() API doesn't support multiple statements any more") } rs, err := s.ExecuteStmt(ctx, stmtNodes[0]) if err != nil { s.sessionVars.StmtCtx.AppendError(err) } if rs == nil { return nil, err } return []sqlexec.RecordSet{rs}, err } type sqlRegexp struct { regexp string } func (s sqlRegexp) sqlRegexpDumpTriggerCheck(cfg *traceevent.DumpTriggerConfig) bool { // TODO: pre-compile the regexp to improve performance match, err := regexp.MatchString(cfg.UserCommand.SQLRegexp, s.regexp) return err == nil && match } // Parse parses a query string to raw ast.StmtNode. func (s *session) Parse(ctx context.Context, sql string) ([]ast.StmtNode, error) { logutil.Logger(ctx).Debug("parse", zap.String("sql", sql)) parseStartTime := time.Now() traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.user_command.sql_regexp", sqlRegexp{sql}.sqlRegexpDumpTriggerCheck) // Load the session variables to the context. // This is necessary for the parser to get the current sql_mode. if err := s.loadCommonGlobalVariablesIfNeeded(); err != nil { return nil, err } stmts, warns, err := s.ParseSQL(ctx, sql, s.sessionVars.GetParseParams()...) if err != nil { s.rollbackOnError(ctx) err = util.SyntaxError(err) // Only print log message when this SQL is from the user. // Mute the warning for internal SQLs. if !s.sessionVars.InRestrictedSQL { logutil.Logger(ctx).Warn("parse SQL failed", zap.Error(err), zap.String("SQL", redact.String(s.sessionVars.EnableRedactLog, sql))) s.sessionVars.StmtCtx.AppendError(err) } return nil, err } durParse := time.Since(parseStartTime) s.GetSessionVars().DurationParse = durParse isInternal := s.isInternal() if isInternal { session_metrics.SessionExecuteParseDurationInternal.Observe(durParse.Seconds()) } else { session_metrics.SessionExecuteParseDurationGeneral.Observe(durParse.Seconds()) } for _, warn := range warns { s.sessionVars.StmtCtx.AppendWarning(util.SyntaxWarn(warn)) } return stmts, nil } // ParseWithParams parses a query string, with arguments, to raw ast.StmtNode. // Note that it will not do escaping if no variable arguments are passed. func (s *session) ParseWithParams(ctx context.Context, sql string, args ...any) (ast.StmtNode, error) { var err error if len(args) > 0 { sql, err = sqlescape.EscapeSQL(sql, args...) if err != nil { return nil, err } } internal := s.isInternal() var stmts []ast.StmtNode var warns []error parseStartTime := time.Now() if internal { // Do no respect the settings from clients, if it is for internal usage. // Charsets from clients may give chance injections. // Refer to https://stackoverflow.com/questions/5741187/sql-injection-that-gets-around-mysql-real-escape-string/12118602. stmts, warns, err = s.ParseSQL(ctx, sql) } else { stmts, warns, err = s.ParseSQL(ctx, sql, s.sessionVars.GetParseParams()...) } if len(stmts) != 1 && err == nil { err = errors.New("run multiple statements internally is not supported") } if err != nil { s.rollbackOnError(ctx) logSQL := sql[:min(500, len(sql))] logutil.Logger(ctx).Warn("parse SQL failed", zap.Error(err), zap.String("SQL", redact.String(s.sessionVars.EnableRedactLog, logSQL))) return nil, util.SyntaxError(err) } durParse := time.Since(parseStartTime) if internal { session_metrics.SessionExecuteParseDurationInternal.Observe(durParse.Seconds()) } else { session_metrics.SessionExecuteParseDurationGeneral.Observe(durParse.Seconds()) } for _, warn := range warns { s.sessionVars.StmtCtx.AppendWarning(util.SyntaxWarn(warn)) } if topsqlstate.TopSQLEnabled() { normalized, digest := parser.NormalizeDigest(sql) if digest != nil { // Reset the goroutine label when internal sql execute finish. // Specifically reset in ExecRestrictedStmt function. s.sessionVars.StmtCtx.IsSQLRegistered.Store(true) topsql.AttachAndRegisterSQLInfo(ctx, normalized, digest, s.sessionVars.InRestrictedSQL) } } return stmts[0], nil } // GetAdvisoryLock acquires an advisory lock of lockName. // Note that a lock can be acquired multiple times by the same session, // in which case we increment a reference count. // Each lock needs to be held in a unique session because // we need to be able to ROLLBACK in any arbitrary order // in order to release the locks. func (s *session) GetAdvisoryLock(lockName string, timeout int64) error { if lock, ok := s.advisoryLocks[lockName]; ok { lock.IncrReferences() return nil } se, clean, err := s.getInternalSession(sqlexec.GetExecOption(nil)) if err != nil { return err } lock := &advisoryLock{session: se, ctx: context.TODO(), owner: s.ShowProcess().ID, clean: clean} err = lock.GetLock(lockName, timeout) if err != nil { return err } s.advisoryLocks[lockName] = lock return nil } // IsUsedAdvisoryLock checks if a lockName is already in use func (s *session) IsUsedAdvisoryLock(lockName string) uint64 { // Same session if lock, ok := s.advisoryLocks[lockName]; ok { return lock.owner } // Check for transaction on advisory_locks table se, clean, err := s.getInternalSession(sqlexec.GetExecOption(nil)) if err != nil { return 0 } lock := &advisoryLock{session: se, ctx: context.TODO(), owner: s.ShowProcess().ID, clean: clean} err = lock.IsUsedLock(lockName) if err != nil { // TODO: Return actual owner pid // TODO: Check for mysql.ErrLockWaitTimeout and DeadLock return 1 } return 0 } // ReleaseAdvisoryLock releases an advisory locks held by the session. // It returns FALSE if no lock by this name was held (by this session), // and TRUE if a lock was held and "released". // Note that the lock is not actually released if there are multiple // references to the same lockName by the session, instead the reference // count is decremented. func (s *session) ReleaseAdvisoryLock(lockName string) (released bool) { if lock, ok := s.advisoryLocks[lockName]; ok { lock.DecrReferences() if lock.ReferenceCount() <= 0 { lock.Close() delete(s.advisoryLocks, lockName) } return true } return false } // ReleaseAllAdvisoryLocks releases all advisory locks held by the session // and returns a count of the locks that were released. // The count is based on unique locks held, so multiple references // to the same lock do not need to be accounted for. func (s *session) ReleaseAllAdvisoryLocks() int { var count int for lockName, lock := range s.advisoryLocks { lock.Close() count += lock.ReferenceCount() delete(s.advisoryLocks, lockName) } return count } // GetExtensions returns the `*extension.SessionExtensions` object func (s *session) GetExtensions() *extension.SessionExtensions { return s.extensions } // SetExtensions sets the `*extension.SessionExtensions` object func (s *session) SetExtensions(extensions *extension.SessionExtensions) { s.extensions = extensions } // InSandBoxMode indicates that this session is in sandbox mode func (s *session) InSandBoxMode() bool { return s.sandBoxMode } // EnableSandBoxMode enable the sandbox mode. func (s *session) EnableSandBoxMode() { s.sandBoxMode = true } // DisableSandBoxMode enable the sandbox mode. func (s *session) DisableSandBoxMode() { s.sandBoxMode = false } // ParseWithParams4Test wrapper (s *session) ParseWithParams for test func ParseWithParams4Test(ctx context.Context, s sessionapi.Session, sql string, args ...any) (ast.StmtNode, error) { return s.(*session).ParseWithParams(ctx, sql, args) } var _ sqlexec.RestrictedSQLExecutor = &session{} var _ sqlexec.SQLExecutor = &session{} // ExecRestrictedStmt implements RestrictedSQLExecutor interface. func (s *session) ExecRestrictedStmt(ctx context.Context, stmtNode ast.StmtNode, opts ...sqlexec.OptionFuncAlias) ( []chunk.Row, []*resolve.ResultField, error) { defer pprof.SetGoroutineLabels(ctx) execOption := sqlexec.GetExecOption(opts) var se *session var clean func() var err error if execOption.UseCurSession { se, clean, err = s.useCurrentSession(execOption) } else { se, clean, err = s.getInternalSession(execOption) } if err != nil { return nil, nil, err } defer clean() startTime := time.Now() metrics.SessionRestrictedSQLCounter.Inc() ctx = execdetails.ContextWithInitializedExecDetails(ctx) rs, err := se.ExecuteStmt(ctx, stmtNode) if err != nil { se.sessionVars.StmtCtx.AppendError(err) } if rs == nil { return nil, nil, err } defer func() { if closeErr := rs.Close(); closeErr != nil { err = closeErr } }() var rows []chunk.Row rows, err = drainRecordSet(ctx, se, rs, nil) if err != nil { return nil, nil, err } vars := se.GetSessionVars() for _, dbName := range GetDBNames(vars) { metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal, dbName, vars.StmtCtx.ResourceGroupName).Observe(time.Since(startTime).Seconds()) } return rows, rs.Fields(), err } // ExecRestrictedStmt4Test wrapper `(s *session) ExecRestrictedStmt` for test. func ExecRestrictedStmt4Test(ctx context.Context, s sessionapi.Session, stmtNode ast.StmtNode, opts ...sqlexec.OptionFuncAlias) ( []chunk.Row, []*resolve.ResultField, error) { ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnOthers) return s.(*session).ExecRestrictedStmt(ctx, stmtNode, opts...) } // only set and clean session with execOption func (s *session) useCurrentSession(execOption sqlexec.ExecOption) (*session, func(), error) { var err error orgSnapshotInfoSchema, orgSnapshotTS := s.sessionVars.SnapshotInfoschema, s.sessionVars.SnapshotTS if execOption.SnapshotTS != 0 { if err = s.sessionVars.SetSystemVar(vardef.TiDBSnapshot, strconv.FormatUint(execOption.SnapshotTS, 10)); err != nil { return nil, nil, err } s.sessionVars.SnapshotInfoschema, err = getSnapshotInfoSchema(s, execOption.SnapshotTS) if err != nil { return nil, nil, err } } prevStatsVer := s.sessionVars.AnalyzeVersion if execOption.AnalyzeVer != 0 { s.sessionVars.AnalyzeVersion = execOption.AnalyzeVer } prevAnalyzeSnapshot := s.sessionVars.EnableAnalyzeSnapshot if execOption.AnalyzeSnapshot != nil { s.sessionVars.EnableAnalyzeSnapshot = *execOption.AnalyzeSnapshot } s.sessionVars.EnableDDLAnalyzeExecOpt = execOption.EnableDDLAnalyze prePruneMode := s.sessionVars.PartitionPruneMode.Load() if len(execOption.PartitionPruneMode) > 0 { s.sessionVars.PartitionPruneMode.Store(execOption.PartitionPruneMode) } prevSQL := s.sessionVars.StmtCtx.OriginalSQL prevStmtType := s.sessionVars.StmtCtx.StmtType prevTables := s.sessionVars.StmtCtx.Tables return s, func() { s.sessionVars.AnalyzeVersion = prevStatsVer s.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot if err := s.sessionVars.SetSystemVar(vardef.TiDBSnapshot, ""); err != nil { logutil.BgLogger().Error("set tidbSnapshot error", zap.Error(err)) } s.sessionVars.SnapshotInfoschema = orgSnapshotInfoSchema s.sessionVars.SnapshotTS = orgSnapshotTS s.sessionVars.PartitionPruneMode.Store(prePruneMode) s.sessionVars.StmtCtx.OriginalSQL = prevSQL s.sessionVars.StmtCtx.StmtType = prevStmtType s.sessionVars.StmtCtx.Tables = prevTables s.sessionVars.MemTracker.Detach() }, nil } func (s *session) getInternalSession(execOption sqlexec.ExecOption) (*session, func(), error) { tmp, err := s.sysSessionPool().Get() if err != nil { return nil, nil, errors.Trace(err) } se := tmp.(*session) // The special session will share the `InspectionTableCache` with current session // if the current session in inspection mode. if cache := s.sessionVars.InspectionTableCache; cache != nil { se.sessionVars.InspectionTableCache = cache } se.sessionVars.OptimizerUseInvisibleIndexes = s.sessionVars.OptimizerUseInvisibleIndexes preSkipStats := s.sessionVars.SkipMissingPartitionStats se.sessionVars.SkipMissingPartitionStats = s.sessionVars.SkipMissingPartitionStats if execOption.SnapshotTS != 0 { if err := se.sessionVars.SetSystemVar(vardef.TiDBSnapshot, strconv.FormatUint(execOption.SnapshotTS, 10)); err != nil { return nil, nil, err } se.sessionVars.SnapshotInfoschema, err = getSnapshotInfoSchema(s, execOption.SnapshotTS) if err != nil { return nil, nil, err } } prevStatsVer := se.sessionVars.AnalyzeVersion if execOption.AnalyzeVer != 0 { se.sessionVars.AnalyzeVersion = execOption.AnalyzeVer } prevAnalyzeSnapshot := se.sessionVars.EnableAnalyzeSnapshot if execOption.AnalyzeSnapshot != nil { se.sessionVars.EnableAnalyzeSnapshot = *execOption.AnalyzeSnapshot } prePruneMode := se.sessionVars.PartitionPruneMode.Load() if len(execOption.PartitionPruneMode) > 0 { se.sessionVars.PartitionPruneMode.Store(execOption.PartitionPruneMode) } se.sessionVars.EnableDDLAnalyzeExecOpt = execOption.EnableDDLAnalyze return se, func() { se.sessionVars.AnalyzeVersion = prevStatsVer se.sessionVars.EnableAnalyzeSnapshot = prevAnalyzeSnapshot if err := se.sessionVars.SetSystemVar(vardef.TiDBSnapshot, ""); err != nil { logutil.BgLogger().Error("set tidbSnapshot error", zap.Error(err)) } se.sessionVars.SnapshotInfoschema = nil se.sessionVars.SnapshotTS = 0 if !execOption.IgnoreWarning { if se != nil && se.GetSessionVars().StmtCtx.WarningCount() > 0 { warnings := se.GetSessionVars().StmtCtx.GetWarnings() s.GetSessionVars().StmtCtx.AppendWarnings(warnings) } } se.sessionVars.PartitionPruneMode.Store(prePruneMode) se.sessionVars.OptimizerUseInvisibleIndexes = false se.sessionVars.SkipMissingPartitionStats = preSkipStats se.sessionVars.InspectionTableCache = nil se.sessionVars.MemTracker.Detach() s.sysSessionPool().Put(tmp) }, nil } func (s *session) withRestrictedSQLExecutor(ctx context.Context, opts []sqlexec.OptionFuncAlias, fn func(context.Context, *session) ([]chunk.Row, []*resolve.ResultField, error)) ([]chunk.Row, []*resolve.ResultField, error) { execOption := sqlexec.GetExecOption(opts) var se *session var clean func() var err error if execOption.UseCurSession { se, clean, err = s.useCurrentSession(execOption) } else { se, clean, err = s.getInternalSession(execOption) } if err != nil { return nil, nil, errors.Trace(err) } defer clean() if execOption.TrackSysProcID > 0 { err = execOption.TrackSysProc(execOption.TrackSysProcID, se) if err != nil { return nil, nil, errors.Trace(err) } // unTrack should be called before clean (return sys session) defer execOption.UnTrackSysProc(execOption.TrackSysProcID) } return fn(ctx, se) } func (s *session) ExecRestrictedSQL(ctx context.Context, opts []sqlexec.OptionFuncAlias, sql string, params ...any) ([]chunk.Row, []*resolve.ResultField, error) { return s.withRestrictedSQLExecutor(ctx, opts, func(ctx context.Context, se *session) ([]chunk.Row, []*resolve.ResultField, error) { stmt, err := se.ParseWithParams(ctx, sql, params...) if err != nil { return nil, nil, errors.Trace(err) } defer pprof.SetGoroutineLabels(ctx) startTime := time.Now() metrics.SessionRestrictedSQLCounter.Inc() ctx = execdetails.ContextWithInitializedExecDetails(ctx) rs, err := se.ExecuteInternalStmt(ctx, stmt) if err != nil { se.sessionVars.StmtCtx.AppendError(err) } if rs == nil { return nil, nil, err } defer func() { if closeErr := rs.Close(); closeErr != nil { err = closeErr } }() var rows []chunk.Row rows, err = drainRecordSet(ctx, se, rs, nil) if err != nil { return nil, nil, err } vars := se.GetSessionVars() for _, dbName := range GetDBNames(vars) { metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal, dbName, vars.StmtCtx.ResourceGroupName).Observe(time.Since(startTime).Seconds()) } return rows, rs.Fields(), err }) } // ExecuteInternalStmt execute internal stmt func (s *session) ExecuteInternalStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) { origin := s.sessionVars.InRestrictedSQL s.sessionVars.InRestrictedSQL = true defer func() { s.sessionVars.InRestrictedSQL = origin // Restore the goroutine label by using the original ctx after execution is finished. pprof.SetGoroutineLabels(ctx) }() return s.ExecuteStmt(ctx, stmtNode) } func queryFailDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool { return config.Event.Type == "query_fail" } type isInternalAlias struct { bool } // isInternalAlias is not intuitive, but it is defined to avoid allocation. // If the code is written as // // traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.suspicious_event.is_internal", func(conf *traceevent.DumpTriggerConfig) { // conf.Event.IsInternal = conf.Event.IsInternal // }) // // It's uncertain whether the Go compiler escape analysis is powerful enough to avoid allocation for the closure object. // isInternalAlias is defined to help the compiler, this coding style will not cause closure object allocation. // // traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.suspicious_event.is_internal", isInternalAlias{s.isInternal()}.isInternalDumpTriggerCheck) func (i isInternalAlias) isInternalDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool { return config.Event.IsInternal == i.bool } func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) { if fr := traceevent.GetFlightRecorder(); fr != nil { traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.sampling", fr.CheckSampling) } rs, err := s.executeStmtImpl(ctx, stmtNode) if err != nil { traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.suspicious_event", queryFailDumpTriggerCheck) } return rs, err } func (s *session) executeStmtImpl(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) { r, ctx := tracing.StartRegionEx(ctx, "session.ExecuteStmt") defer r.End() if err := s.PrepareTxnCtx(ctx, stmtNode); err != nil { return nil, err } if err := s.loadCommonGlobalVariablesIfNeeded(); err != nil { return nil, err } sessVars := s.sessionVars sessVars.StartTime = time.Now() traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.suspicious_event.is_internal", isInternalAlias{s.isInternal()}.isInternalDumpTriggerCheck) // Some executions are done in compile stage, so we reset them before compile. if err := executor.ResetContextOfStmt(s, stmtNode); err != nil { return nil, err } if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { if binParam, ok := execStmt.BinaryArgs.([]param.BinaryParam); ok { args, err := expression.ExecBinaryParam(s.GetSessionVars().StmtCtx.TypeCtx(), binParam) if err != nil { return nil, err } execStmt.BinaryArgs = args } } normalizedSQL, digest := s.sessionVars.StmtCtx.SQLDigest() cmdByte := byte(atomic.LoadUint32(&s.GetSessionVars().CommandValue)) if topsqlstate.TopSQLEnabled() { s.sessionVars.StmtCtx.IsSQLRegistered.Store(true) ctx = topsql.AttachAndRegisterSQLInfo(ctx, normalizedSQL, digest, s.sessionVars.InRestrictedSQL) } if err := s.validateStatementInTxn(stmtNode); err != nil { return nil, err } if err := s.validateStatementReadOnlyInStaleness(stmtNode); err != nil { return nil, err } // Uncorrelated subqueries will execute once when building plan, so we reset process info before building plan. s.currentPlan = nil // reset current plan s.SetProcessInfo(stmtNode.Text(), time.Now(), cmdByte, 0) s.txn.onStmtStart(digest.String()) defer sessiontxn.GetTxnManager(s).OnStmtEnd() defer s.txn.onStmtEnd() if err := s.onTxnManagerStmtStartOrRetry(ctx, stmtNode); err != nil { return nil, err } failpoint.Inject("mockStmtSlow", func(val failpoint.Value) { if strings.Contains(stmtNode.Text(), "/* sleep */") { v, _ := val.(int) time.Sleep(time.Duration(v) * time.Millisecond) } }) var stmtLabel string if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.sessionVars) if err == nil && prepareStmt.PreparedAst != nil { stmtLabel = stmtctx.GetStmtLabel(ctx, prepareStmt.PreparedAst.Stmt) } } if stmtLabel == "" { stmtLabel = stmtctx.GetStmtLabel(ctx, stmtNode) } s.setRequestSource(ctx, stmtLabel, stmtNode) globalMemArbitrator := memory.GlobalMemArbitrator() arbitratorMode := globalMemArbitrator.WorkMode() enableWaitAverse := sessVars.MemArbitrator.WaitAverse == variable.MemArbitratorWaitAverseEnable execUseArbitrator := globalMemArbitrator != nil && sessVars.ConnectionID != 0 && sessVars.MemArbitrator.WaitAverse != variable.MemArbitratorNolimit && sessVars.StmtCtx.MemSensitive compilePlanMemQuota := int64(0) // mem quota for compiler & optimizer if execUseArbitrator { compilePlanMemQuota = approxCompilePlanMemQuota(normalizedSQL, sessVars.StmtCtx.InSelectStmt) execUseArbitrator = compilePlanMemQuota > 0 } releaseCommonQuota := func() { // release common quota if compilePlanMemQuota > 0 { _ = globalMemArbitrator.ConsumeQuotaFromAwaitFreePool(sessVars.ConnectionID, -compilePlanMemQuota) compilePlanMemQuota = 0 } } if execUseArbitrator { intoErrBeforeExec := func() error { if enableWaitAverse { metrics.GlobalMemArbitratorSubTasks.CancelWaitAversePlan.Inc() return exeerrors.ErrQueryExecStopped.GenWithStackByArgs(memory.ArbitratorWaitAverseCancel.String()+defSuffixCompilePlan, sessVars.ConnectionID) } if arbitratorMode == memory.ArbitratorModeStandard { metrics.GlobalMemArbitratorSubTasks.CancelStandardModePlan.Inc() return exeerrors.ErrQueryExecStopped.GenWithStackByArgs(memory.ArbitratorStandardCancel.String()+defSuffixCompilePlan, sessVars.ConnectionID) } return nil } if globalMemArbitrator.AtMemRisk() { if err := intoErrBeforeExec(); err != nil { return nil, err } for globalMemArbitrator.AtMemRisk() { if globalMemArbitrator.AtOOMRisk() { metrics.GlobalMemArbitratorSubTasks.ForceKillPlan.Inc() return nil, exeerrors.ErrQueryExecStopped.GenWithStackByArgs(memory.ArbitratorOOMRiskKill.String()+defSuffixCompilePlan, sessVars.ConnectionID) } time.Sleep(defOOMRiskCheckDur) } } arbitratorOutOfQuota := !globalMemArbitrator.ConsumeQuotaFromAwaitFreePool(sessVars.ConnectionID, compilePlanMemQuota) defer releaseCommonQuota() if arbitratorOutOfQuota { // for SQL which needs to be controlled by mem-arbitrator if err := intoErrBeforeExec(); err != nil { return nil, err } } } var stmt *executor.ExecStmt var err error { // Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). compiler := executor.Compiler{Ctx: s} stmt, err = compiler.Compile(ctx, stmtNode) // TODO: report precise tracked heap inuse to the global mem-arbitrator if necessary } // check if resource group hint is valid, can't do this in planner.Optimize because we can access // infoschema there. if sessVars.StmtCtx.ResourceGroupName != sessVars.ResourceGroupName { // if target resource group doesn't exist, fallback to the origin resource group. if _, ok := s.infoCache.GetLatest().ResourceGroupByName(ast.NewCIStr(sessVars.StmtCtx.ResourceGroupName)); !ok { logutil.Logger(ctx).Warn("Unknown resource group from hint", zap.String("name", sessVars.StmtCtx.ResourceGroupName)) sessVars.StmtCtx.ResourceGroupName = sessVars.ResourceGroupName if txn, err := s.Txn(false); err == nil && txn != nil && txn.Valid() { kv.SetTxnResourceGroup(txn, sessVars.ResourceGroupName) } } } if err != nil { s.rollbackOnError(ctx) // Only print log message when this SQL is from the user. // Mute the warning for internal SQLs. if !s.sessionVars.InRestrictedSQL { if !variable.ErrUnknownSystemVar.Equal(err) { sql := stmtNode.Text() sql = parser.Normalize(sql, s.sessionVars.EnableRedactLog) logutil.Logger(ctx).Warn("compile SQL failed", zap.Error(err), zap.String("SQL", sql)) } } return nil, err } durCompile := time.Since(s.sessionVars.StartTime) s.GetSessionVars().DurationCompile = durCompile if s.isInternal() { session_metrics.SessionExecuteCompileDurationInternal.Observe(durCompile.Seconds()) } else { session_metrics.SessionExecuteCompileDurationGeneral.Observe(durCompile.Seconds()) } s.currentPlan = stmt.Plan if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { if execStmt.Name == "" { // for exec-stmt on bin-protocol, ignore the plan detail in `show process` to gain performance benefits. s.currentPlan = nil } } // Execute the physical plan. defer logStmt(stmt, s) // defer until txnStartTS is set if sessVars.MemArbitrator.WaitAverse == variable.MemArbitratorNolimit { metrics.GlobalMemArbitratorSubTasks.NoLimit.Inc() } if execUseArbitrator { releaseCommonQuota() reserveSize := min(sessVars.MemArbitrator.QueryReserved, memory.DefMaxLimit) memPriority := memory.ArbitrationPriorityMedium if sg, ok := domain.GetDomain(s).InfoSchema().ResourceGroupByName(ast.NewCIStr(sessVars.StmtCtx.ResourceGroupName)); ok { switch sg.Priority { case ast.LowPriorityValue: memPriority = memory.ArbitrationPriorityLow case ast.MediumPriorityValue: memPriority = memory.ArbitrationPriorityMedium case ast.HighPriorityValue: memPriority = memory.ArbitrationPriorityHigh } } digestKey := normalizedSQL // digestKey := sessVars.StmtCtx.OriginalSQL tracker := sessVars.StmtCtx.MemTracker if !tracker.InitMemArbitrator( globalMemArbitrator, sessVars.MemQuotaQuery, sessVars.MemTracker.Killer, digestKey, memPriority, enableWaitAverse, reserveSize, s.isInternal(), ) { return nil, errors.New("failed to init mem-arbitrator") } defer func() { // detach mem-arbitrator and rethrow panic if any if r := recover(); r != nil { tracker.DetachMemArbitrator() panic(r) } }() } var recordSet sqlexec.RecordSet if stmt.PsStmt != nil { // point plan short path ctx, prevTraceID := resetStmtTraceID(ctx, s) // Emit stmt.start trace event (simplified for point-get fast path) if traceevent.IsEnabled(traceevent.StmtLifecycle) { fields := []zap.Field{ zap.Uint64("conn_id", s.sessionVars.ConnectionID), } // Include previous trace ID to create statement chain if len(prevTraceID) > 0 { fields = append(fields, zap.String("prev_trace_id", redact.Key(prevTraceID))) } traceevent.TraceEvent(ctx, traceevent.StmtLifecycle, "stmt.start", fields...) } // Defer stmt.finish trace event (simplified for point-get fast path) defer func() { if traceevent.IsEnabled(traceevent.StmtLifecycle) { fields := []zap.Field{ zap.Uint64("conn_id", s.sessionVars.ConnectionID), } if err != nil { fields = append(fields, zap.Error(err)) } traceevent.TraceEvent(ctx, traceevent.StmtLifecycle, "stmt.finish", fields...) } }() recordSet, err = stmt.PointGet(ctx) s.setLastTxnInfoBeforeTxnEnd() s.txn.changeToInvalid() } else { recordSet, err = runStmt(ctx, s, stmt) } // Observe the resource group query total counter if the resource control is enabled and the // current session is attached with a resource group. resourceGroupName := s.GetSessionVars().StmtCtx.ResourceGroupName if len(resourceGroupName) > 0 { metrics.ResourceGroupQueryTotalCounter.WithLabelValues(resourceGroupName, resourceGroupName).Inc() } if err != nil { if !errIsNoisy(err) { logutil.Logger(ctx).Warn("run statement failed", zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()), zap.Error(err), zap.String("session", s.String())) } return recordSet, err } if !s.isInternal() && config.GetGlobalConfig().EnableTelemetry { telemetry.CurrentExecuteCount.Inc() tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(stmt.Plan) if tiFlashPushDown { telemetry.CurrentTiFlashPushDownCount.Inc() } if tiFlashExchangePushDown { telemetry.CurrentTiFlashExchangePushDownCount.Inc() } } return recordSet, nil } func (s *session) GetSQLExecutor() sqlexec.SQLExecutor { return s } func (s *session) GetRestrictedSQLExecutor() sqlexec.RestrictedSQLExecutor { return s } func (s *session) onTxnManagerStmtStartOrRetry(ctx context.Context, node ast.StmtNode) error { if s.sessionVars.RetryInfo.Retrying { return sessiontxn.GetTxnManager(s).OnStmtRetry(ctx) } return sessiontxn.GetTxnManager(s).OnStmtStart(ctx, node) } func (s *session) validateStatementInTxn(stmtNode ast.StmtNode) error { vars := s.GetSessionVars() if _, ok := stmtNode.(*ast.ImportIntoStmt); ok && vars.InTxn() { return errors.New("cannot run IMPORT INTO in explicit transaction") } return nil } func (s *session) validateStatementReadOnlyInStaleness(stmtNode ast.StmtNode) error { vars := s.GetSessionVars() if !vars.TxnCtx.IsStaleness && vars.TxnReadTS.PeakTxnReadTS() == 0 && !vars.EnableExternalTSRead || vars.InRestrictedSQL { return nil } errMsg := "only support read-only statement during read-only staleness transactions" node := stmtNode.(ast.Node) switch v := node.(type) { case *ast.SplitRegionStmt: return nil case *ast.SelectStmt: // select lock statement needs start a transaction which will be conflict to stale read, // we forbid select lock statement in stale read for now. if v.LockInfo != nil { return errors.New("select lock hasn't been supported in stale read yet") } if !plannercore.IsReadOnly(stmtNode, vars) { return errors.New(errMsg) } return nil case *ast.ExplainStmt, *ast.DoStmt, *ast.ShowStmt, *ast.SetOprStmt, *ast.ExecuteStmt, *ast.SetOprSelectList: if !plannercore.IsReadOnly(stmtNode, vars) { return errors.New(errMsg) } return nil default: } // covered DeleteStmt/InsertStmt/UpdateStmt/CallStmt/LoadDataStmt if _, ok := stmtNode.(ast.DMLNode); ok { return errors.New(errMsg) } return nil } // fileTransInConnKeys contains the keys of queries that will be handled by handleFileTransInConn. var fileTransInConnKeys = []fmt.Stringer{ executor.LoadDataVarKey, executor.LoadStatsVarKey, executor.PlanReplayerLoadVarKey, } func (s *session) hasFileTransInConn() bool { s.mu.RLock() defer s.mu.RUnlock() for _, k := range fileTransInConnKeys { v := s.mu.values[k] if v != nil { return true } } return false } type sqlDigestAlias struct { Digest string } func (digest sqlDigestAlias) sqlDigestDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool { return config.UserCommand.SQLDigest == digest.Digest } type userAlias struct { user string } func (u userAlias) byUserDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool { return config.UserCommand.ByUser == u.user } type stmtLabelAlias struct { label string } func (s stmtLabelAlias) stmtLabelDumpTriggerCheck(config *traceevent.DumpTriggerConfig) bool { return config.UserCommand.StmtLabel == s.label } // resetStmtTraceID generates a new trace ID for the current statement, // injects it into the session context for cross-statement correlation, and returns the previous trace ID. func resetStmtTraceID(ctx context.Context, se *session) (context.Context, []byte) { // Capture previous trace ID from session variables (for statement chaining) // We store it in session variables instead of context because the context // is recreated for each statement and doesn't persist across executions prevTraceID := se.sessionVars.PrevTraceID // Inject trace ID into context for correlation across TiDB -> client-go -> TiKV // This enables trace events to be correlated by trace_id field // The trace ID is generated from transaction start_ts and statement count startTS := se.sessionVars.TxnCtx.StartTS stmtCount := uint64(se.sessionVars.TxnCtx.StatementCount) traceID := traceevent.GenerateTraceID(ctx, startTS, stmtCount) ctx = trace.ContextWithTraceID(ctx, traceID) se.currentCtx = ctx // Store trace ID for next statement se.sessionVars.PrevTraceID = traceID return ctx, prevTraceID } // runStmt executes the sqlexec.Statement and commit or rollback the current transaction. func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.RecordSet, err error) { failpoint.Inject("assertTxnManagerInRunStmt", func() { sessiontxn.RecordAssert(se, "assertTxnManagerInRunStmt", true) if stmt, ok := s.(*executor.ExecStmt); ok { sessiontxn.AssertTxnManagerInfoSchema(se, stmt.InfoSchema) } }) r, ctx := tracing.StartRegionEx(ctx, "session.runStmt") defer r.End() if r.Span != nil { r.Span.LogKV("sql", s.Text()) } ctx, prevTraceID := resetStmtTraceID(ctx, se) stmtCtx := se.sessionVars.StmtCtx sqlDigest, _ := stmtCtx.SQLDigest() // Make sure StmtType is filled even if succ is false. if stmtCtx.StmtType == "" { stmtCtx.StmtType = stmtctx.GetStmtLabel(ctx, s.GetStmtNode()) } // Emit stmt.start trace event if traceevent.IsEnabled(traceevent.StmtLifecycle) { fields := []zap.Field{ zap.String("sql_digest", sqlDigest), zap.Bool("autocommit", se.sessionVars.IsAutocommit()), zap.Uint64("conn_id", se.sessionVars.ConnectionID), } // Include previous trace ID to create statement chain if len(prevTraceID) > 0 { fields = append(fields, zap.String("prev_trace_id", redact.Key(prevTraceID))) } traceevent.TraceEvent(ctx, traceevent.StmtLifecycle, "stmt.start", fields...) } // Not using closure to avoid unnecessary memory allocation. traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.user_command.sql_digest", sqlDigestAlias{sqlDigest}.sqlDigestDumpTriggerCheck) if se.sessionVars.User != nil && se.sessionVars.User.Username != "" { traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.user_command.by_user", userAlias{se.sessionVars.User.Username}.byUserDumpTriggerCheck) } traceevent.CheckFlightRecorderDumpTrigger(ctx, "dump_trigger.user_command.stmt_label", stmtLabelAlias{stmtCtx.StmtType}.stmtLabelDumpTriggerCheck) // Defer stmt.finish trace event to capture final state including errors defer func() { if traceevent.IsEnabled(traceevent.StmtLifecycle) { stmtCtx := se.sessionVars.StmtCtx sqlDigest, _ := stmtCtx.SQLDigest() _, planDigest := stmtCtx.GetPlanDigest() var planDigestHex string if planDigest != nil { planDigestHex = hex.EncodeToString(planDigest.Bytes()) } fields := []zap.Field{ zap.String("sql_digest", sqlDigest), zap.String("plan_digest", planDigestHex), zap.Bool("autocommit", se.sessionVars.IsAutocommit()), zap.Uint64("conn_id", se.sessionVars.ConnectionID), zap.Int("retry_count", se.sessionVars.TxnCtx.StatementCount), } if err != nil { fields = append(fields, zap.Error(err)) } traceevent.TraceEvent(ctx, traceevent.StmtLifecycle, "stmt.finish", fields...) } }() se.SetValue(sessionctx.QueryString, s.Text()) if _, ok := s.(*executor.ExecStmt).StmtNode.(ast.DDLNode); ok { se.SetValue(sessionctx.LastExecuteDDL, true) } else { se.ClearValue(sessionctx.LastExecuteDDL) } sessVars := se.sessionVars // Save origTxnCtx here to avoid it reset in the transaction retry. origTxnCtx := sessVars.TxnCtx err = se.checkTxnAborted(s) if err != nil { return nil, err } if sessVars.TxnCtx.CouldRetry && !s.IsReadOnly(sessVars) { // Only when the txn is could retry and the statement is not read only, need to do stmt-count-limit check, // otherwise, the stmt won't be add into stmt history, and also don't need check. // About `stmt-count-limit`, see more in https://docs.pingcap.com/tidb/stable/tidb-configuration-file#stmt-count-limit if err := checkStmtLimit(ctx, se, false); err != nil { return nil, err } } rs, err = s.Exec(ctx) if se.txn.Valid() && se.txn.IsPipelined() { // Pipelined-DMLs can return assertion errors and write conflicts here because they flush // during execution, handle these errors as we would handle errors after a commit. if err != nil { err = se.handleAssertionFailure(ctx, err) } newErr := se.tryReplaceWriteConflictError(ctx, err) if newErr != nil { err = newErr } } se.updateTelemetryMetric(s.(*executor.ExecStmt)) sessVars.TxnCtx.StatementCount++ if rs != nil { if se.GetSessionVars().StmtCtx.IsExplainAnalyzeDML { if !sessVars.InTxn() { se.StmtCommit(ctx) if err := se.CommitTxn(ctx); err != nil { return nil, err } } } return &execStmtResult{ RecordSet: rs, sql: s, se: se, }, err } err = finishStmt(ctx, se, err, s) if se.hasFileTransInConn() { // The query will be handled later in handleFileTransInConn, // then should call the ExecStmt.FinishExecuteStmt to finish this statement. se.SetValue(ExecStmtVarKey, s.(*executor.ExecStmt)) } else { // If it is not a select statement or special query, we record its slow log here, // then it could include the transaction commit time. s.(*executor.ExecStmt).FinishExecuteStmt(origTxnCtx.StartTS, err, false) } return nil, err } // ExecStmtVarKeyType is a dummy type to avoid naming collision in context. type ExecStmtVarKeyType int // String defines a Stringer function for debugging and pretty printing. func (ExecStmtVarKeyType) String() string { return "exec_stmt_var_key" } // ExecStmtVarKey is a variable key for ExecStmt. const ExecStmtVarKey ExecStmtVarKeyType = 0 // execStmtResult is the return value of ExecuteStmt and it implements the sqlexec.RecordSet interface. // Why we need a struct to wrap a RecordSet and provide another RecordSet? // This is because there are so many session state related things that definitely not belongs to the original // RecordSet, so this struct exists and RecordSet.Close() is overridden to handle that. type execStmtResult struct { sqlexec.RecordSet se *session sql sqlexec.Statement once sync.Once closed bool } func (rs *execStmtResult) Finish() error { var err error rs.once.Do(func() { var err1 error if f, ok := rs.RecordSet.(interface{ Finish() error }); ok { err1 = f.Finish() } err2 := finishStmt(context.Background(), rs.se, err, rs.sql) if err1 != nil { err = err1 } else { err = err2 } }) return err } func (rs *execStmtResult) Close() error { if rs.closed { return nil } err1 := rs.Finish() err2 := rs.RecordSet.Close() rs.closed = true if err1 != nil { return err1 } return err2 } func (rs *execStmtResult) TryDetach() (sqlexec.RecordSet, bool, error) { // If `TryDetach` is called, the connection must have set `mysql.ServerStatusCursorExists`, or // the `StatementContext` will be re-used and cause data race. intest.Assert(rs.se.GetSessionVars().HasStatusFlag(mysql.ServerStatusCursorExists)) if !rs.sql.IsReadOnly(rs.se.GetSessionVars()) { return nil, false, nil } if !plannercore.IsAutoCommitTxn(rs.se.GetSessionVars()) { return nil, false, nil } drs, ok := rs.RecordSet.(sqlexec.DetachableRecordSet) if !ok { return nil, false, nil } detachedRS, ok, err := drs.TryDetach() if !ok || err != nil { return nil, ok, err } cursorHandle := rs.se.GetCursorTracker().NewCursor( cursor.State{StartTS: rs.se.GetSessionVars().TxnCtx.StartTS}, ) crs := staticrecordset.WrapRecordSetWithCursor(cursorHandle, detachedRS) // Now, a transaction is not needed for the detached record set, so we commit the transaction and cleanup // the session state. err = finishStmt(context.Background(), rs.se, nil, rs.sql) if err != nil { err2 := detachedRS.Close() if err2 != nil { logutil.BgLogger().Error("close detached record set failed", zap.Error(err2)) } return nil, true, err } return crs, true, nil } // GetExecutor4Test exports the internal executor for test purpose. func (rs *execStmtResult) GetExecutor4Test() any { return rs.RecordSet.(interface{ GetExecutor4Test() any }).GetExecutor4Test() } // rollbackOnError makes sure the next statement starts a new transaction with the latest InfoSchema. func (s *session) rollbackOnError(ctx context.Context) { if !s.sessionVars.InTxn() { s.RollbackTxn(ctx) } } // PrepareStmt is used for executing prepare statement in binary protocol func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields []*resolve.ResultField, err error) { defer func() { if s.sessionVars.StmtCtx != nil { s.sessionVars.StmtCtx.DetachMemDiskTracker() } }() if s.sessionVars.TxnCtx.InfoSchema == nil { // We don't need to create a transaction for prepare statement, just get information schema will do. s.sessionVars.TxnCtx.InfoSchema = s.infoCache.GetLatest() } err = s.loadCommonGlobalVariablesIfNeeded() if err != nil { return } ctx := context.Background() // NewPrepareExec may need startTS to build the executor, for example prepare statement has subquery in int. // So we have to call PrepareTxnCtx here. if err = s.PrepareTxnCtx(ctx, nil); err != nil { return } prepareStmt := &ast.PrepareStmt{SQLText: sql} if err = s.onTxnManagerStmtStartOrRetry(ctx, prepareStmt); err != nil { return } if err = sessiontxn.GetTxnManager(s).AdviseWarmup(); err != nil { return } prepareExec := executor.NewPrepareExec(s, sql) err = prepareExec.Next(ctx, nil) // Rollback even if err is nil. s.rollbackOnError(ctx) if err != nil { return } return prepareExec.ID, prepareExec.ParamCount, prepareExec.Fields, nil } // ExecutePreparedStmt executes a prepared statement. func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, params []expression.Expression) (sqlexec.RecordSet, error) { prepStmt, err := s.sessionVars.GetPreparedStmtByID(stmtID) if err != nil { err = plannererrors.ErrStmtNotFound logutil.Logger(ctx).Error("prepared statement not found", zap.Uint32("stmtID", stmtID)) return nil, err } stmt, ok := prepStmt.(*plannercore.PlanCacheStmt) if !ok { return nil, errors.Errorf("invalid PlanCacheStmt type") } execStmt := &ast.ExecuteStmt{ BinaryArgs: params, PrepStmt: stmt, PrepStmtId: stmtID, } return s.ExecuteStmt(ctx, execStmt) } func (s *session) DropPreparedStmt(stmtID uint32) error { vars := s.sessionVars if _, ok := vars.PreparedStmts[stmtID]; !ok { return plannererrors.ErrStmtNotFound } vars.RetryInfo.DroppedPreparedStmtIDs = append(vars.RetryInfo.DroppedPreparedStmtIDs, stmtID) return nil } func (s *session) Txn(active bool) (kv.Transaction, error) { if !active { return &s.txn, nil } _, err := sessiontxn.GetTxnManager(s).ActivateTxn() s.SetMemoryFootprintChangeHook() return &s.txn, err } func (s *session) SetValue(key fmt.Stringer, value any) { s.mu.Lock() s.mu.values[key] = value s.mu.Unlock() } func (s *session) Value(key fmt.Stringer) any { s.mu.RLock() value := s.mu.values[key] s.mu.RUnlock() return value } func (s *session) ClearValue(key fmt.Stringer) { s.mu.Lock() delete(s.mu.values, key) s.mu.Unlock() } type inCloseSession struct{} // Close function does some clean work when session end. // Close should release the table locks which hold by the session. func (s *session) Close() { // TODO: do clean table locks when session exited without execute Close. // TODO: do clean table locks when tidb-server was `kill -9`. if s.HasLockedTables() && config.TableLockEnabled() { if ds := config.TableLockDelayClean(); ds > 0 { time.Sleep(time.Duration(ds) * time.Millisecond) } lockedTables := s.GetAllTableLocks() err := domain.GetDomain(s).DDLExecutor().UnlockTables(s, lockedTables) if err != nil { logutil.BgLogger().Error("release table lock failed", zap.Uint64("conn", s.sessionVars.ConnectionID)) } } s.ReleaseAllAdvisoryLocks() if s.statsCollector != nil { s.statsCollector.Delete() } if s.idxUsageCollector != nil { s.idxUsageCollector.Flush() } telemetry.GlobalBuiltinFunctionsUsage.Collect(s.GetBuiltinFunctionUsage()) bindValue := s.Value(bindinfo.SessionBindInfoKeyType) if bindValue != nil { bindValue.(bindinfo.SessionBindingHandle).Close() } ctx := context.WithValue(context.TODO(), inCloseSession{}, struct{}{}) s.RollbackTxn(ctx) s.sessionVars.WithdrawAllPreparedStmt() if s.stmtStats != nil { s.stmtStats.SetFinished() } s.sessionVars.ClearDiskFullOpt() if s.sessionPlanCache != nil { s.sessionPlanCache.Close() } if s.sessionVars.ConnectionID != 0 { memory.RemovePoolFromGlobalMemArbitrator(s.sessionVars.ConnectionID) } // Detach session trackers during session cleanup. // ANALYZE attaches session MemTracker to GlobalAnalyzeMemoryTracker; without // detachment, closed sessions cannot be garbage collected. if s.sessionVars.MemTracker != nil { s.sessionVars.MemTracker.Detach() } if s.sessionVars.DiskTracker != nil { s.sessionVars.DiskTracker.Detach() } } // GetSessionVars implements the context.Context interface. func (s *session) GetSessionVars() *variable.SessionVars { return s.sessionVars } // GetPlanCtx returns the PlanContext. func (s *session) GetPlanCtx() planctx.PlanContext { return s.pctx } // GetExprCtx returns the expression context of the session. func (s *session) GetExprCtx() exprctx.ExprContext { return s.exprctx } // GetTableCtx returns the table.MutateContext func (s *session) GetTableCtx() tblctx.MutateContext { return s.tblctx } // GetDistSQLCtx returns the context used in DistSQL func (s *session) GetDistSQLCtx() *distsqlctx.DistSQLContext { vars := s.GetSessionVars() sc := vars.StmtCtx dctx := sc.GetOrInitDistSQLFromCache(func() *distsqlctx.DistSQLContext { // cross ks session does not have domain. dom := s.GetDomain().(*domain.Domain) var rgCtl *rmclient.ResourceGroupsController if dom != nil { rgCtl = dom.ResourceGroupsController() } return &distsqlctx.DistSQLContext{ WarnHandler: sc.WarnHandler, InRestrictedSQL: sc.InRestrictedSQL, Client: s.GetClient(), EnabledRateLimitAction: vars.EnabledRateLimitAction, EnableChunkRPC: vars.EnableChunkRPC, OriginalSQL: sc.OriginalSQL, KVVars: vars.KVVars, KvExecCounter: sc.KvExecCounter, SessionMemTracker: vars.MemTracker, Location: sc.TimeZone(), RuntimeStatsColl: sc.RuntimeStatsColl, SQLKiller: &vars.SQLKiller, CPUUsage: &vars.SQLCPUUsages, ErrCtx: sc.ErrCtx(), TiFlashReplicaRead: vars.TiFlashReplicaRead, TiFlashMaxThreads: vars.TiFlashMaxThreads, TiFlashMaxBytesBeforeExternalJoin: vars.TiFlashMaxBytesBeforeExternalJoin, TiFlashMaxBytesBeforeExternalGroupBy: vars.TiFlashMaxBytesBeforeExternalGroupBy, TiFlashMaxBytesBeforeExternalSort: vars.TiFlashMaxBytesBeforeExternalSort, TiFlashMaxQueryMemoryPerNode: vars.TiFlashMaxQueryMemoryPerNode, TiFlashQuerySpillRatio: vars.TiFlashQuerySpillRatio, TiFlashHashJoinVersion: vars.TiFlashHashJoinVersion, DistSQLConcurrency: vars.DistSQLScanConcurrency(), ReplicaReadType: vars.GetReplicaRead(), WeakConsistency: sc.WeakConsistency, RCCheckTS: sc.RCCheckTS, NotFillCache: sc.NotFillCache, TaskID: sc.TaskID, Priority: sc.Priority, ResourceGroupTagger: sc.GetResourceGroupTagger(), EnablePaging: vars.EnablePaging, MinPagingSize: vars.MinPagingSize, MaxPagingSize: vars.MaxPagingSize, RequestSourceType: vars.RequestSourceType, ExplicitRequestSourceType: vars.ExplicitRequestSourceType, StoreBatchSize: vars.StoreBatchSize, ResourceGroupName: sc.ResourceGroupName, LoadBasedReplicaReadThreshold: vars.LoadBasedReplicaReadThreshold, RunawayChecker: sc.RunawayChecker, RUConsumptionReporter: rgCtl, TiKVClientReadTimeout: vars.GetTiKVClientReadTimeout(), MaxExecutionTime: vars.GetMaxExecutionTime(), ReplicaClosestReadThreshold: vars.ReplicaClosestReadThreshold, ConnectionID: vars.ConnectionID, SessionAlias: vars.SessionAlias, ExecDetails: &sc.SyncExecDetails, } }) // Check if the runaway checker is updated. This is to avoid that evaluating a non-correlated subquery // during the optimization phase will cause the `*distsqlctx.DistSQLContext` to be created before the // runaway checker is set later at the execution phase. // Ref: https://github.com/pingcap/tidb/issues/61899 if dctx.RunawayChecker != sc.RunawayChecker { dctx.RunawayChecker = sc.RunawayChecker } return dctx } // GetRangerCtx returns the context used in `ranger` related functions func (s *session) GetRangerCtx() *rangerctx.RangerContext { vars := s.GetSessionVars() sc := vars.StmtCtx rctx := sc.GetOrInitRangerCtxFromCache(func() any { return &rangerctx.RangerContext{ ExprCtx: s.GetExprCtx(), TypeCtx: s.GetSessionVars().StmtCtx.TypeCtx(), ErrCtx: s.GetSessionVars().StmtCtx.ErrCtx(), RegardNULLAsPoint: s.GetSessionVars().RegardNULLAsPoint, OptPrefixIndexSingleScan: s.GetSessionVars().OptPrefixIndexSingleScan, OptimizerFixControl: s.GetSessionVars().OptimizerFixControl, PlanCacheTracker: &s.GetSessionVars().StmtCtx.PlanCacheTracker, RangeFallbackHandler: &s.GetSessionVars().StmtCtx.RangeFallbackHandler, } }) return rctx.(*rangerctx.RangerContext) } // GetBuildPBCtx returns the context used in `ToPB` method func (s *session) GetBuildPBCtx() *planctx.BuildPBContext { vars := s.GetSessionVars() sc := vars.StmtCtx bctx := sc.GetOrInitBuildPBCtxFromCache(func() any { return &planctx.BuildPBContext{ ExprCtx: s.GetExprCtx(), Client: s.GetClient(), TiFlashFastScan: s.GetSessionVars().TiFlashFastScan, TiFlashFineGrainedShuffleBatchSize: s.GetSessionVars().TiFlashFineGrainedShuffleBatchSize, // the following fields are used to build `expression.PushDownContext`. // TODO: it'd be better to embed `expression.PushDownContext` in `BuildPBContext`. But `expression` already // depends on this package, so we need to move `expression.PushDownContext` to a standalone package first. GroupConcatMaxLen: s.GetSessionVars().GroupConcatMaxLen, InExplainStmt: s.GetSessionVars().StmtCtx.InExplainStmt, WarnHandler: s.GetSessionVars().StmtCtx.WarnHandler, ExtraWarnghandler: s.GetSessionVars().StmtCtx.ExtraWarnHandler, } }) return bctx.(*planctx.BuildPBContext) } func (s *session) AuthPluginForUser(ctx context.Context, user *auth.UserIdentity) (string, error) { pm := privilege.GetPrivilegeManager(s) authplugin, err := pm.GetAuthPluginForConnection(ctx, user.Username, user.Hostname) if err != nil { return "", err } return authplugin, nil } // Auth validates a user using an authentication string and salt. // If the password fails, it will keep trying other users until exhausted. // This means it can not be refactored to use MatchIdentity yet. func (s *session) Auth(user *auth.UserIdentity, authentication, salt []byte, authConn conn.AuthConn) error { hasPassword := "YES" if len(authentication) == 0 { hasPassword = "NO" } pm := privilege.GetPrivilegeManager(s) authUser, err := s.MatchIdentity(context.Background(), user.Username, user.Hostname) if err != nil { return privileges.ErrAccessDenied.FastGenByArgs(user.Username, user.Hostname, hasPassword) } // Check whether continuous login failure is enabled to lock the account. // If enabled, determine whether to unlock the account and notify TiDB to update the cache. enableAutoLock := pm.IsAccountAutoLockEnabled(authUser.Username, authUser.Hostname) if enableAutoLock { err = failedLoginTrackingBegin(s) if err != nil { return err } lockStatusChanged, err := verifyAccountAutoLock(s, authUser.Username, authUser.Hostname) if err != nil { rollbackErr := failedLoginTrackingRollback(s) if rollbackErr != nil { return rollbackErr } return err } err = failedLoginTrackingCommit(s) if err != nil { rollbackErr := failedLoginTrackingRollback(s) if rollbackErr != nil { return rollbackErr } return err } if lockStatusChanged { // Notification auto unlock. err = domain.GetDomain(s).NotifyUpdatePrivilege([]string{authUser.Username}) if err != nil { return err } } } info, err := pm.ConnectionVerification(user, authUser.Username, authUser.Hostname, authentication, salt, s.sessionVars, authConn) if err != nil { if info.FailedDueToWrongPassword { // when user enables the account locking function for consecutive login failures, // the system updates the login failure count and determines whether to lock the account when authentication fails. if enableAutoLock { err := failedLoginTrackingBegin(s) if err != nil { return err } lockStatusChanged, passwordLocking, trackingErr := authFailedTracking(s, authUser.Username, authUser.Hostname) if trackingErr != nil { if rollBackErr := failedLoginTrackingRollback(s); rollBackErr != nil { return rollBackErr } return trackingErr } if err := failedLoginTrackingCommit(s); err != nil { if rollBackErr := failedLoginTrackingRollback(s); rollBackErr != nil { return rollBackErr } return err } if lockStatusChanged { // Notification auto lock. err := autolockAction(s, passwordLocking, authUser.Username, authUser.Hostname) if err != nil { return err } } } } return err } if vardef.EnableResourceControl.Load() && info.ResourceGroupName != "" { s.sessionVars.SetResourceGroupName(info.ResourceGroupName) } if info.InSandBoxMode { // Enter sandbox mode, only execute statement for resetting password. s.EnableSandBoxMode() } if enableAutoLock { err := failedLoginTrackingBegin(s) if err != nil { return err } // The password is correct. If the account is not locked, the number of login failure statistics will be cleared. err = authSuccessClearCount(s, authUser.Username, authUser.Hostname) if err != nil { if rollBackErr := failedLoginTrackingRollback(s); rollBackErr != nil { return rollBackErr } return err } err = failedLoginTrackingCommit(s) if err != nil { if rollBackErr := failedLoginTrackingRollback(s); rollBackErr != nil { return rollBackErr } return err } } pm.AuthSuccess(authUser.Username, authUser.Hostname) user.AuthUsername = authUser.Username user.AuthHostname = authUser.Hostname s.sessionVars.User = user s.sessionVars.ActiveRoles = pm.GetDefaultRoles(context.Background(), user.AuthUsername, user.AuthHostname) return nil } func authSuccessClearCount(s *session, user string, host string) error { // Obtain accurate lock status and failure count information. passwordLocking, err := getFailedLoginUserAttributes(s, user, host) if err != nil { return err } // If the account is locked, it may be caused by the untimely update of the cache, // directly report the account lock. if passwordLocking.AutoAccountLocked { if passwordLocking.PasswordLockTimeDays == -1 { return privileges.GenerateAccountAutoLockErr(passwordLocking.FailedLoginAttempts, user, host, "unlimited", "unlimited") } lds := strconv.FormatInt(passwordLocking.PasswordLockTimeDays, 10) return privileges.GenerateAccountAutoLockErr(passwordLocking.FailedLoginAttempts, user, host, lds, lds) } if passwordLocking.FailedLoginCount != 0 { // If the number of account login failures is not zero, it will be updated to 0. passwordLockingJSON := privileges.BuildSuccessPasswordLockingJSON(passwordLocking.FailedLoginAttempts, passwordLocking.PasswordLockTimeDays) if passwordLockingJSON != "" { if err := s.passwordLocking(user, host, passwordLockingJSON); err != nil { return err } } } return nil } func verifyAccountAutoLock(s *session, user, host string) (bool, error) { pm := privilege.GetPrivilegeManager(s) // Use the cache to determine whether to unlock the account. // If the account needs to be unlocked, read the database information to determine whether // the account needs to be unlocked. Otherwise, an error message is displayed. lockStatusInMemory, err := pm.VerifyAccountAutoLockInMemory(user, host) if err != nil { return false, err } // If the lock status in the cache is Unlock, the automatic unlock is skipped. // If memory synchronization is slow and there is a lock in the database, it will be processed upon successful login. if !lockStatusInMemory { return false, nil } lockStatusChanged := false var plJSON string // After checking the cache, obtain the latest data from the database and determine // whether to automatically unlock the database to prevent repeated unlock errors. pl, err := getFailedLoginUserAttributes(s, user, host) if err != nil { return false, err } if pl.AutoAccountLocked { // If it is locked, need to check whether it can be automatically unlocked. lockTimeDay := pl.PasswordLockTimeDays if lockTimeDay == -1 { return false, privileges.GenerateAccountAutoLockErr(pl.FailedLoginAttempts, user, host, "unlimited", "unlimited") } lastChanged := pl.AutoLockedLastChanged d := time.Now().Unix() - lastChanged if d <= lockTimeDay*24*60*60 { lds := strconv.FormatInt(lockTimeDay, 10) rds := strconv.FormatInt(int64(math.Ceil(float64(lockTimeDay)-float64(d)/(24*60*60))), 10) return false, privileges.GenerateAccountAutoLockErr(pl.FailedLoginAttempts, user, host, lds, rds) } // Generate unlock json string. plJSON = privileges.BuildPasswordLockingJSON(pl.FailedLoginAttempts, pl.PasswordLockTimeDays, "N", 0, time.Now().Format(time.UnixDate)) } if plJSON != "" { lockStatusChanged = true if err = s.passwordLocking(user, host, plJSON); err != nil { return false, err } } return lockStatusChanged, nil } func authFailedTracking(s *session, user string, host string) (bool, *privileges.PasswordLocking, error) { // Obtain the number of consecutive password login failures. passwordLocking, err := getFailedLoginUserAttributes(s, user, host) if err != nil { return false, nil, err } // Consecutive wrong password login failure times +1, // If the lock condition is satisfied, the lock status is updated and the update cache is notified. lockStatusChanged, err := userAutoAccountLocked(s, user, host, passwordLocking) if err != nil { return false, nil, err } return lockStatusChanged, passwordLocking, nil } func autolockAction(s *session, passwordLocking *privileges.PasswordLocking, user, host string) error { // Don't want to update the cache frequently, and only trigger the update cache when the lock status is updated. err := domain.GetDomain(s).NotifyUpdatePrivilege([]string{user}) if err != nil { return err } // The number of failed login attempts reaches FAILED_LOGIN_ATTEMPTS. // An error message is displayed indicating permission denial and account lock. if passwordLocking.PasswordLockTimeDays == -1 { return privileges.GenerateAccountAutoLockErr(passwordLocking.FailedLoginAttempts, user, host, "unlimited", "unlimited") } lds := strconv.FormatInt(passwordLocking.PasswordLockTimeDays, 10) return privileges.GenerateAccountAutoLockErr(passwordLocking.FailedLoginAttempts, user, host, lds, lds) } func (s *session) passwordLocking(user string, host string, newAttributesStr string) error { sql := new(strings.Builder) sqlescape.MustFormatSQL(sql, "UPDATE %n.%n SET ", mysql.SystemDB, mysql.UserTable) sqlescape.MustFormatSQL(sql, "user_attributes=json_merge_patch(coalesce(user_attributes, '{}'), %?)", newAttributesStr) sqlescape.MustFormatSQL(sql, " WHERE Host=%? and User=%?;", host, user) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) _, err := s.ExecuteInternal(ctx, sql.String()) return err } func failedLoginTrackingBegin(s *session) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) _, err := s.ExecuteInternal(ctx, "BEGIN PESSIMISTIC") return err } func failedLoginTrackingCommit(s *session) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) _, err := s.ExecuteInternal(ctx, "COMMIT") if err != nil { _, rollBackErr := s.ExecuteInternal(ctx, "ROLLBACK") if rollBackErr != nil { return rollBackErr } } return err } func failedLoginTrackingRollback(s *session) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) _, err := s.ExecuteInternal(ctx, "ROLLBACK") return err } // getFailedLoginUserAttributes queries the exact number of consecutive password login failures (concurrency is not allowed). func getFailedLoginUserAttributes(s *session, user string, host string) (*privileges.PasswordLocking, error) { passwordLocking := &privileges.PasswordLocking{} ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnPrivilege) rs, err := s.ExecuteInternal(ctx, `SELECT user_attributes from mysql.user WHERE USER = %? AND HOST = %? for update`, user, host) if err != nil { return passwordLocking, err } defer func() { if closeErr := rs.Close(); closeErr != nil { err = closeErr } }() req := rs.NewChunk(nil) iter := chunk.NewIterator4Chunk(req) err = rs.Next(ctx, req) if err != nil { return passwordLocking, err } if req.NumRows() == 0 { return passwordLocking, fmt.Errorf("user_attributes by `%s`@`%s` not found", user, host) } row := iter.Begin() if !row.IsNull(0) { passwordLockingJSON := row.GetJSON(0) return passwordLocking, passwordLocking.ParseJSON(passwordLockingJSON) } return passwordLocking, fmt.Errorf("user_attributes by `%s`@`%s` not found", user, host) } func userAutoAccountLocked(s *session, user string, host string, pl *privileges.PasswordLocking) (bool, error) { // Indicates whether the user needs to update the lock status change. lockStatusChanged := false // The number of consecutive login failures is stored in the database. // If the current login fails, one is added to the number of consecutive login failures // stored in the database to determine whether the user needs to be locked and the number of update failures. failedLoginCount := pl.FailedLoginCount + 1 // If the cache is not updated, but it is already locked, it will report that the account is locked. if pl.AutoAccountLocked { if pl.PasswordLockTimeDays == -1 { return false, privileges.GenerateAccountAutoLockErr(pl.FailedLoginAttempts, user, host, "unlimited", "unlimited") } lds := strconv.FormatInt(pl.PasswordLockTimeDays, 10) return false, privileges.GenerateAccountAutoLockErr(pl.FailedLoginAttempts, user, host, lds, lds) } autoAccountLocked := "N" autoLockedLastChanged := "" if pl.FailedLoginAttempts == 0 || pl.PasswordLockTimeDays == 0 { return false, nil } if failedLoginCount >= pl.FailedLoginAttempts { autoLockedLastChanged = time.Now().Format(time.UnixDate) autoAccountLocked = "Y" lockStatusChanged = true } newAttributesStr := privileges.BuildPasswordLockingJSON(pl.FailedLoginAttempts, pl.PasswordLockTimeDays, autoAccountLocked, failedLoginCount, autoLockedLastChanged) if newAttributesStr != "" { return lockStatusChanged, s.passwordLocking(user, host, newAttributesStr) } return lockStatusChanged, nil } // MatchIdentity finds the matching username + password in the MySQL privilege tables // for a username + hostname, since MySQL can have wildcards. func (s *session) MatchIdentity(ctx context.Context, username, remoteHost string) (*auth.UserIdentity, error) { pm := privilege.GetPrivilegeManager(s) var success bool var skipNameResolve bool var user = &auth.UserIdentity{} varVal, err := s.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(vardef.SkipNameResolve) if err == nil && variable.TiDBOptOn(varVal) { skipNameResolve = true } user.Username, user.Hostname, success = pm.MatchIdentity(ctx, username, remoteHost, skipNameResolve) if success { return user, nil } // This error will not be returned to the user, access denied will be instead return nil, fmt.Errorf("could not find matching user in MatchIdentity: %s, %s", username, remoteHost) } // AuthWithoutVerification is required by the ResetConnection RPC func (s *session) AuthWithoutVerification(ctx context.Context, user *auth.UserIdentity) bool { pm := privilege.GetPrivilegeManager(s) authUser, err := s.MatchIdentity(ctx, user.Username, user.Hostname) if err != nil { return false } if pm.GetAuthWithoutVerification(authUser.Username, authUser.Hostname) { user.AuthUsername = authUser.Username user.AuthHostname = authUser.Hostname s.sessionVars.User = user s.sessionVars.ActiveRoles = pm.GetDefaultRoles(ctx, user.AuthUsername, user.AuthHostname) return true } return false } // SetSessionStatesHandler implements the Session.SetSessionStatesHandler interface. func (s *session) SetSessionStatesHandler(stateType sessionstates.SessionStateType, handler sessionctx.SessionStatesHandler) { s.sessionStatesHandlers[stateType] = handler } // ReportUsageStats reports the usage stats func (s *session) ReportUsageStats() { if s.idxUsageCollector != nil { s.idxUsageCollector.Report() } } // CreateSession4Test creates a new session environment for test. func CreateSession4Test(store kv.Storage) (sessionapi.Session, error) { se, err := CreateSession4TestWithOpt(store, nil) if err == nil { // Cover both chunk rpc encoding and default encoding. // nolint:gosec if rand.Intn(2) == 0 { se.GetSessionVars().EnableChunkRPC = false } else { se.GetSessionVars().EnableChunkRPC = true } } return se, err } // Opt describes the option for creating session type Opt struct { PreparedPlanCache sessionctx.SessionPlanCache } // CreateSession4TestWithOpt creates a new session environment for test. func CreateSession4TestWithOpt(store kv.Storage, opt *Opt) (sessionapi.Session, error) { s, err := CreateSessionWithOpt(store, opt) if err == nil { // initialize session variables for test. s.GetSessionVars().InitChunkSize = 2 s.GetSessionVars().MaxChunkSize = 32 s.GetSessionVars().MinPagingSize = vardef.DefMinPagingSize s.GetSessionVars().EnablePaging = vardef.DefTiDBEnablePaging s.GetSessionVars().StmtCtx.SetTimeZone(s.GetSessionVars().Location()) err = s.GetSessionVars().SetSystemVarWithoutValidation(vardef.CharacterSetConnection, "utf8mb4") } return s, err } // CreateSession creates a new session environment. func CreateSession(store kv.Storage) (sessionapi.Session, error) { return CreateSessionWithOpt(store, nil) } // CreateSessionWithOpt creates a new session environment with option. // Use default option if opt is nil. func CreateSessionWithOpt(store kv.Storage, opt *Opt) (sessionapi.Session, error) { do, err := domap.Get(store) if err != nil { return nil, err } s, err := createSessionWithOpt(store, do, do.GetSchemaValidator(), do.InfoCache(), opt) if err != nil { return nil, err } // Add auth here. extensions, err := extension.GetExtensions() if err != nil { return nil, err } pm := privileges.NewUserPrivileges(do.PrivilegeHandle(), extensions) privilege.BindPrivilegeManager(s, pm) // Add stats collector, and it will be freed by background stats worker // which periodically updates stats using the collected data. if do.StatsHandle() != nil && do.StatsUpdating() { s.statsCollector = do.StatsHandle().NewSessionStatsItem().(*usage.SessionStatsItem) if config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load() { s.idxUsageCollector = do.StatsHandle().NewSessionIndexUsageCollector() } } s.cursorTracker = cursor.NewTracker() return s, nil } // loadCollationParameter loads collation parameter from mysql.tidb func loadCollationParameter(ctx context.Context, se *session) (bool, error) { para, err := se.getTableValue(ctx, mysql.TiDBTable, TidbNewCollationEnabled) if err != nil { return false, err } switch para { case varTrue: return true, nil case varFalse: return false, nil } logutil.BgLogger().Warn( "Unexpected value of 'new_collation_enabled' in 'mysql.tidb', use 'False' instead", zap.String("value", para)) return false, nil } // DatabaseBasicInfo contains the basic information of a database. type DatabaseBasicInfo struct { ID int64 Name string Tables []TableBasicInfo } // TableBasicInfo contains the basic information of a table used in DDL. type TableBasicInfo struct { ID int64 Name string SQL string } type versionedDDLTables struct { ver meta.DDLTableVersion tables []TableBasicInfo } var ( errResultIsEmpty = dbterror.ClassExecutor.NewStd(errno.ErrResultIsEmpty) // DDLJobTables is a list of tables definitions used in concurrent DDL. DDLJobTables = []TableBasicInfo{ {ID: metadef.TiDBDDLJobTableID, Name: "tidb_ddl_job", SQL: metadef.CreateTiDBDDLJobTable}, {ID: metadef.TiDBDDLReorgTableID, Name: "tidb_ddl_reorg", SQL: metadef.CreateTiDBReorgTable}, {ID: metadef.TiDBDDLHistoryTableID, Name: "tidb_ddl_history", SQL: metadef.CreateTiDBDDLHistoryTable}, } // MDLTables is a list of tables definitions used for metadata lock. MDLTables = []TableBasicInfo{ {ID: metadef.TiDBMDLInfoTableID, Name: "tidb_mdl_info", SQL: metadef.CreateTiDBMDLTable}, } // BackfillTables is a list of tables definitions used in dist reorg DDL. BackfillTables = []TableBasicInfo{ {ID: metadef.TiDBBackgroundSubtaskTableID, Name: "tidb_background_subtask", SQL: metadef.CreateTiDBBackgroundSubtaskTable}, {ID: metadef.TiDBBackgroundSubtaskHistoryTableID, Name: "tidb_background_subtask_history", SQL: metadef.CreateTiDBBackgroundSubtaskHistoryTable}, } // DDLNotifierTables contains the table definitions used in DDL notifier. // It only contains the notifier table. // Put it here to reuse a unified initialization function and make it easier to find. DDLNotifierTables = []TableBasicInfo{ {ID: metadef.TiDBDDLNotifierTableID, Name: "tidb_ddl_notifier", SQL: metadef.CreateTiDBDDLNotifierTable}, } ddlTableVersionTables = []versionedDDLTables{ {ver: meta.BaseDDLTableVersion, tables: DDLJobTables}, {ver: meta.MDLTableVersion, tables: MDLTables}, {ver: meta.BackfillTableVersion, tables: BackfillTables}, {ver: meta.DDLNotifierTableVersion, tables: DDLNotifierTables}, } ) func splitAndScatterTable(store kv.Storage, tableIDs []int64) { if s, ok := store.(kv.SplittableStore); ok && atomic.LoadUint32(&ddl.EnableSplitTableRegion) == 1 { ctxWithTimeout, cancel := context.WithTimeout(context.Background(), vardef.DefWaitSplitRegionTimeout*time.Second) defer cancel() keys := make([][]byte, 0, len(tableIDs)) for _, id := range tableIDs { keys = append(keys, tablecodec.GenTablePrefix(id)) } gid := ddl.GlobalScatterGroupID // tables created through DDL during bootstrap also don't scatter, we keep // the same behavior here. _, err := s.SplitRegions(ctxWithTimeout, keys, false, &gid) if err != nil { // It will be automatically split by TiKV later. logutil.BgLogger().Warn("split table region failed", zap.Error(err)) } } } // InitDDLTables creates system tables that DDL uses. Because CREATE TABLE is // also a DDL, we must directly modify KV data to create these tables. func InitDDLTables(store kv.Storage) error { ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) return kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMutator(txn) currVer, err := t.GetDDLTableVersion() if err != nil { return errors.Trace(err) } dbID, err := t.CreateMySQLDatabaseIfNotExists() if err != nil { return err } largestVer := currVer for _, vt := range ddlTableVersionTables { if currVer >= vt.ver { continue } logutil.BgLogger().Info("init DDL tables", zap.Int("currVer", int(currVer)), zap.Int("targetVer", int(vt.ver))) largestVer = max(largestVer, vt.ver) if err = createAndSplitTables(store, t, dbID, vt.tables); err != nil { return err } } if largestVer > currVer { return t.SetDDLTableVersion(largestVer) } return nil }) } func createAndSplitTables(store kv.Storage, t *meta.Mutator, dbID int64, tables []TableBasicInfo) error { var ( tableIDs = make([]int64, 0, len(tables)) tblInfos = make([]*model.TableInfo, 0, len(tables)) ) p := parser.New() for _, tbl := range tables { failpoint.InjectCall("mockCreateSystemTableSQL", &tbl) tableIDs = append(tableIDs, tbl.ID) stmt, err := p.ParseOneStmt(tbl.SQL, "", "") if err != nil { return errors.Trace(err) } // bootstrap session set sessionctx.Initing = true, and uses None SQL mode, // we also use it here. evalCtx := exprstatic.NewEvalContext(exprstatic.WithSQLMode(mysql.ModeNone)) exprCtx := exprstatic.NewExprContext(exprstatic.WithEvalCtx(evalCtx)) mbCtx := metabuild.NewContext(metabuild.WithExprCtx(exprCtx)) tblInfo, err := ddl.BuildTableInfoFromAST(mbCtx, stmt.(*ast.CreateTableStmt)) if err != nil { return errors.Trace(err) } tblInfo.State = model.StatePublic tblInfo.ID = tbl.ID tblInfo.UpdateTS = t.StartTS if err = checkSystemTableConstraint(tblInfo); err != nil { return errors.Trace(err) } tblInfos = append(tblInfos, tblInfo) } splitAndScatterTable(store, tableIDs) for _, tblInfo := range tblInfos { err := t.CreateTableOrView(dbID, tblInfo) if err != nil { return errors.Trace(err) } } return nil } // InitMDLVariableForBootstrap initializes the metadata lock variable. func InitMDLVariableForBootstrap(store kv.Storage) error { err := kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMutator(txn) return t.SetMetadataLock(true) }) if err != nil { return err } vardef.SetEnableMDL(true) return nil } // InitTiDBSchemaCacheSize initializes the tidb schema cache size. func InitTiDBSchemaCacheSize(store kv.Storage) error { var ( isNull bool size uint64 err error ) err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMutator(txn) size, isNull, err = t.GetSchemaCacheSize() if err != nil { return errors.Trace(err) } if isNull { size = vardef.DefTiDBSchemaCacheSize return t.SetSchemaCacheSize(size) } return nil }) if err != nil { return errors.Trace(err) } vardef.SchemaCacheSize.Store(size) return nil } // InitMDLVariable initializes the metadata lock variable. func InitMDLVariable(store kv.Storage) error { isNull := false enable := false var err error err = kv.RunInNewTxn(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMutator(txn) enable, isNull, err = t.GetMetadataLock() if err != nil { return err } if isNull { // Workaround for version: nightly-2022-11-07 to nightly-2022-11-17. enable = true logutil.BgLogger().Warn("metadata lock is null") err = t.SetMetadataLock(true) if err != nil { return err } } return nil }) vardef.SetEnableMDL(enable) return err } // BootstrapSession bootstrap session and domain. func BootstrapSession(store kv.Storage) (*domain.Domain, error) { return bootstrapSessionImpl(context.Background(), store, createSessions) } // BootstrapSession4DistExecution bootstrap session and dom for Distributed execution test, only for unit testing. func BootstrapSession4DistExecution(store kv.Storage) (*domain.Domain, error) { return bootstrapSessionImpl(context.Background(), store, createSessions4DistExecution) } // bootstrapSessionImpl bootstraps session and domain. // the process works as follows: // - if we haven't bootstrapped to the target version // - create/init/start domain // - bootstrap or upgrade, some variables will be initialized and stored to system // table in the process, such as system time-zone // - close domain // // - create/init another domain // - initialization global variables from system table that's required to use sessionCtx, // such as system time zone // - start domain and other routines. func bootstrapSessionImpl(ctx context.Context, store kv.Storage, createSessionsImpl func(store kv.Storage, cnt int) ([]*session, error)) (*domain.Domain, error) { ver := getStoreBootstrapVersionWithCache(store) if kv.IsUserKS(store) { systemKSVer := mustGetStoreBootstrapVersion(kvstore.GetSystemStorage()) if systemKSVer == notBootstrapped { logutil.BgLogger().Fatal("SYSTEM keyspace is not bootstrapped") } else if ver > systemKSVer { logutil.BgLogger().Fatal("bootstrap version of user keyspace must be smaller or equal to that of SYSTEM keyspace", zap.Int64("user", ver), zap.Int64("system", systemKSVer)) } } ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnBootstrap) cfg := config.GetGlobalConfig() if len(cfg.Instance.PluginLoad) > 0 { err := plugin.Load(context.Background(), plugin.Config{ Plugins: strings.Split(cfg.Instance.PluginLoad, ","), PluginDir: cfg.Instance.PluginDir, }) if err != nil { return nil, err } } if kerneltype.IsNextGen() { if err := bootstrapSchemas(store); err != nil { return nil, err } } err := InitDDLTables(store) if err != nil { return nil, err } err = InitTiDBSchemaCacheSize(store) if err != nil { return nil, err } if ver < currentBootstrapVersion { runInBootstrapSession(store, ver) } else { logutil.BgLogger().Info("cluster already bootstrapped", zap.Int64("version", ver)) err = InitMDLVariable(store) if err != nil { return nil, err } } // initiate disttask framework components which need a store scheduler.RegisterSchedulerFactory( proto.ImportInto, func(ctx context.Context, task *proto.Task, param scheduler.Param) scheduler.Scheduler { return importinto.NewImportScheduler(ctx, task, param) }, ) taskexecutor.RegisterTaskType( proto.ImportInto, func(ctx context.Context, task *proto.Task, param taskexecutor.Param) taskexecutor.TaskExecutor { return importinto.NewImportExecutor(ctx, task, param) }, ) concurrency := config.GetGlobalConfig().Performance.StatsLoadConcurrency if concurrency == 0 { // if concurrency is 0, we will set the concurrency of sync load by CPU. concurrency = syncload.GetSyncLoadConcurrencyByCPU() } if concurrency < 0 { // it is only for test, in the production, negative value is illegal. concurrency = 0 } ses, err := createSessionsImpl(store, 10) if err != nil { return nil, err } // Mark all bootstrap sessions as restricted since they are used for internal operations // ses[0]: main bootstrap session // ses[1-2]: reserved // ses[3]: privilege loading // ses[4]: sysvar cache // ses[5]: telemetry, expression pushdown // ses[6]: plan replayer collector // ses[7]: dump file GC // ses[8]: historical stats // ses[9]: bootstrap SQL file for i := range ses { ses[i].GetSessionVars().InRestrictedSQL = true } // get system tz from mysql.tidb tz, err := ses[0].getTableValue(ctx, mysql.TiDBTable, tidbSystemTZ) if err != nil { return nil, err } timeutil.SetSystemTZ(tz) // get the flag from `mysql`.`tidb` which indicating if new collations are enabled. newCollationEnabled, err := loadCollationParameter(ctx, ses[0]) if err != nil { return nil, err } collate.SetNewCollationEnabledForTest(newCollationEnabled) // only start the domain after we have initialized some global variables. dom := domain.GetDomain(ses[0]) err = dom.Start(ddl.Normal) if err != nil { return nil, err } // To deal with the location partition failure caused by inconsistent NewCollationEnabled values(see issue #32416). rebuildAllPartitionValueMapAndSorted(ctx, ses[0]) if !config.GetGlobalConfig().Security.SkipGrantTable { err = dom.LoadPrivilegeLoop(ses[3]) if err != nil { return nil, err } } // Rebuild sysvar cache in a loop err = dom.LoadSysVarCacheLoop(ses[4]) if err != nil { return nil, err } // We should make the load bind-info loop before other loops which has internal SQL. // Binding Handle must be initialized after LoadSysVarCacheLoop since // it'll use `tidb_mem_quota_binding_cache` to set the cache size. err = dom.InitBindingHandle() if err != nil { return nil, err } if config.GetGlobalConfig().DisaggregatedTiFlash && !config.GetGlobalConfig().UseAutoScaler { // Invalid client-go tiflash_compute store cache if necessary. err = dom.WatchTiFlashComputeNodeChange() if err != nil { return nil, err } } if err = extensionimpl.Bootstrap(context.Background(), dom); err != nil { return nil, err } if len(cfg.Instance.PluginLoad) > 0 { err := plugin.Init(context.Background(), plugin.Config{EtcdClient: dom.GetEtcdClient()}) if err != nil { return nil, err } } err = executor.LoadExprPushdownBlacklist(ses[5]) if err != nil { return nil, err } err = executor.LoadOptRuleBlacklist(ctx, ses[5]) if err != nil { return nil, err } if config.GetGlobalConfig().EnableTelemetry { // There is no way to turn telemetry on with global variable `tidb_enable_telemetry` // when it is disabled in config. See IsTelemetryEnabled function in telemetry/telemetry.go go func() { dom.TelemetryLoop(ses[5]) }() } planReplayerWorkerCnt := config.GetGlobalConfig().Performance.PlanReplayerDumpWorkerConcurrency planReplayerWorkersSctx := make([]sessionctx.Context, planReplayerWorkerCnt) pworkerSes, err := createSessions(store, int(planReplayerWorkerCnt)) if err != nil { return nil, err } for i := range int(planReplayerWorkerCnt) { planReplayerWorkersSctx[i] = pworkerSes[i] } // setup plan replayer handle dom.SetupPlanReplayerHandle(ses[6], planReplayerWorkersSctx) dom.StartPlanReplayerHandle() // setup dumpFileGcChecker dom.SetupDumpFileGCChecker(ses[7]) dom.DumpFileGcCheckerLoop() // setup historical stats worker dom.SetupHistoricalStatsWorker(ses[8]) dom.StartHistoricalStatsWorker() failToLoadOrParseSQLFile := false // only used for unit test if runBootstrapSQLFile { pm := &privileges.UserPrivileges{ Handle: dom.PrivilegeHandle(), } privilege.BindPrivilegeManager(ses[9], pm) if err := doBootstrapSQLFile(ses[9]); err != nil && intest.EnableInternalCheck { failToLoadOrParseSQLFile = true } } // setup extract Handle extractWorkers := 1 sctxs, err := createSessions(store, extractWorkers) if err != nil { return nil, err } extractWorkerSctxs := make([]sessionctx.Context, 0) for _, sctx := range sctxs { extractWorkerSctxs = append(extractWorkerSctxs, sctx) } dom.SetupExtractHandle(extractWorkerSctxs) // setup init stats loader if err = dom.LoadAndUpdateStatsLoop(concurrency); err != nil { return nil, err } // init the instance plan cache dom.InitInstancePlanCache() // setup workload-based learning worker dom.SetupWorkloadBasedLearningWorker() // start TTL job manager after setup stats collector // because TTL could modify a lot of columns, and need to trigger auto analyze statshandle.AttachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor { if s, ok := s.(*session); ok { return attachStatsCollector(s, dom) } return s } statshandle.DetachStatsCollector = func(s sqlexec.SQLExecutor) sqlexec.SQLExecutor { if s, ok := s.(*session); ok { return detachStatsCollector(s) } return s } dom.StartTTLJobManager() dom.LoadSigningCertLoop(cfg.Security.SessionTokenSigningCert, cfg.Security.SessionTokenSigningKey) if raw, ok := store.(kv.EtcdBackend); ok { err = raw.StartGCWorker() if err != nil { return nil, err } } // This only happens in testing, since the failure of loading or parsing sql file // would panic the bootstrapping. if intest.EnableInternalCheck && failToLoadOrParseSQLFile { dom.Close() return nil, errors.New("Fail to load or parse sql file") } err = dom.InitDistTaskLoop() if err != nil { return nil, err } return dom, err } // GetDomain gets the associated domain for store. func GetDomain(store kv.Storage) (*domain.Domain, error) { return domap.Get(store) } // GetOrCreateDomainWithFilter gets the associated domain for store. If domain not created, create a new one with the given schema filter. func GetOrCreateDomainWithFilter(store kv.Storage, filter issyncer.Filter) (*domain.Domain, error) { return domap.GetOrCreateWithFilter(store, filter) } // getStartMode gets the start mode according to the bootstrap version. func getStartMode(ver int64) ddl.StartMode { if ver == notBootstrapped { return ddl.Bootstrap } else if ver < currentBootstrapVersion { return ddl.Upgrade } return ddl.Normal } // runInBootstrapSession create a special session for bootstrap to run. // If no bootstrap and storage is remote, we must use a little lease time to // bootstrap quickly, after bootstrapped, we will reset the lease time. // TODO: Using a bootstrap tool for doing this may be better later. func runInBootstrapSession(store kv.Storage, ver int64) { startMode := getStartMode(ver) startTime := time.Now() defer func() { logutil.BgLogger().Info("bootstrap cluster finished", zap.String("bootMode", string(startMode)), zap.Duration("cost", time.Since(startTime))) }() if startMode == ddl.Upgrade { // TODO at this time domain must not be created, else it will register server // info, and cause deadlock, we need to make sure this in a clear way logutil.BgLogger().Info("[upgrade] get owner lock to upgrade") releaseFn, err := acquireLock(store) if err != nil { logutil.BgLogger().Fatal("[upgrade] get owner lock failed", zap.Error(err)) } defer releaseFn() currVer := mustGetStoreBootstrapVersion(store) if currVer >= currentBootstrapVersion { // It is already bootstrapped/upgraded by another TiDB instance, but // we still need to go through the following domain Start/Close code // right now as we have already initialized it when creating the session, // so we switch to normal mode. // TODO remove this after we can refactor below code out in this case. logutil.BgLogger().Info("[upgrade] already upgraded by other nodes, switch to normal mode") startMode = ddl.Normal } } s, err := createSession(store) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("createSession error", zap.Error(err)) } dom := domain.GetDomain(s) err = dom.Start(startMode) if err != nil { // Bootstrap fail will cause program exit. logutil.BgLogger().Fatal("start domain error", zap.Error(err)) } // For the bootstrap SQLs, the following variables should be compatible with old TiDB versions. // TODO we should have a createBootstrapSession to init those special variables. s.sessionVars.EnableClusteredIndex = vardef.ClusteredIndexDefModeIntOnly s.SetValue(sessionctx.Initing, true) switch startMode { case ddl.Bootstrap: bootstrap(s) case ddl.Upgrade: // below sleep is used to mitigate https://github.com/pingcap/tidb/issues/57003, // to let the older owner have time to notice that it's already retired. time.Sleep(owner.WaitTimeOnForceOwner) upgrade(s) case ddl.Normal: // We need to init MDL variable before start the domain to prevent potential stuck issue // when upgrade is skipped. See https://github.com/pingcap/tidb/issues/64539. if err := InitMDLVariable(store); err != nil { logutil.BgLogger().Fatal("init metadata lock failed during normal startup", zap.Error(err)) } } finishBootstrap(store) s.ClearValue(sessionctx.Initing) dom.Close() if intest.InTest { infosync.MockGlobalServerInfoManagerEntry.Close() } domap.Delete(store) } func createSessions(store kv.Storage, cnt int) ([]*session, error) { return createSessionsImpl(store, cnt) } func createSessions4DistExecution(store kv.Storage, cnt int) ([]*session, error) { domap.Delete(store) return createSessionsImpl(store, cnt) } func createSessionsImpl(store kv.Storage, cnt int) ([]*session, error) { // Then we can create new dom ses := make([]*session, cnt) for i := range cnt { se, err := createSession(store) if err != nil { return nil, err } ses[i] = se } return ses, nil } // createSession creates a new session. // Please note that such a session is not tracked by the internal session list. // This means the min ts reporter is not aware of it and may report a wrong min start ts. // In most cases you should use a session pool in domain instead. func createSession(store kv.Storage) (*session, error) { dom, err := domap.Get(store) if err != nil { return nil, err } return createSessionWithOpt(store, dom, dom.GetSchemaValidator(), dom.InfoCache(), nil) } func createCrossKSSession(currKSStore kv.Storage, targetKS string, validator validatorapi.Validator) (*session, error) { if currKSStore.GetKeyspace() == targetKS { return nil, errors.New("cannot create session for the same keyspace") } dom, err := domap.Get(currKSStore) if err != nil { return nil, err } store, err := dom.GetKSStore(targetKS) if err != nil { return nil, err } infoCache, err := dom.GetKSInfoCache(targetKS) if err != nil { return nil, err } // TODO: use the schema validator of the target keyspace when we implement // the info schema syncer for cross keyspace access. return createSessionWithOpt(store, nil, validator, infoCache, nil) } func createSessionWithOpt( store kv.Storage, dom *domain.Domain, schemaValidator validatorapi.Validator, infoCache *infoschema.InfoCache, opt *Opt, ) (*session, error) { var ddlOwnerMgr owner.Manager if dom != nil { // we don't set dom for cross keyspace access. ddlOwnerMgr = dom.DDL().OwnerManager() } crossKS := dom == nil s := &session{ dom: dom, crossKS: crossKS, schemaValidator: schemaValidator, infoCache: infoCache, store: store, ddlOwnerManager: ddlOwnerMgr, client: store.GetClient(), mppClient: store.GetMPPClient(), stmtStats: stmtstats.CreateStatementStats(), sessionStatesHandlers: make(map[sessionstates.SessionStateType]sessionctx.SessionStatesHandler), } s.sessionVars = variable.NewSessionVars(s) s.exprctx = sessionexpr.NewExprContext(s) s.pctx = newPlanContextImpl(s) s.tblctx = tblsession.NewMutateContext(s) s.functionUsageMu.builtinFunctionUsage = make(telemetry.BuiltinFunctionsUsage) if opt != nil && opt.PreparedPlanCache != nil { s.sessionPlanCache = opt.PreparedPlanCache } s.mu.values = make(map[fmt.Stringer]any) s.lockedTables = make(map[int64]model.TableLockTpInfo) s.advisoryLocks = make(map[string]*advisoryLock) // session implements variable.GlobalVarAccessor. Bind it to ctx. s.sessionVars.GlobalVarsAccessor = s s.txn.init() sessionBindHandle := bindinfo.NewSessionBindingHandle() s.SetValue(bindinfo.SessionBindInfoKeyType, sessionBindHandle) s.SetSessionStatesHandler(sessionstates.StateBinding, sessionBindHandle) return s, nil } // attachStatsCollector attaches the stats collector in the dom for the session func attachStatsCollector(s *session, dom *domain.Domain) *session { if dom.StatsHandle() != nil && dom.StatsUpdating() { if s.statsCollector == nil { s.statsCollector = dom.StatsHandle().NewSessionStatsItem().(*usage.SessionStatsItem) } if s.idxUsageCollector == nil && config.GetGlobalConfig().Instance.EnableCollectExecutionInfo.Load() { s.idxUsageCollector = dom.StatsHandle().NewSessionIndexUsageCollector() } } return s } // detachStatsCollector removes the stats collector in the session func detachStatsCollector(s *session) *session { if s.statsCollector != nil { s.statsCollector.Delete() s.statsCollector = nil } if s.idxUsageCollector != nil { s.idxUsageCollector.Flush() s.idxUsageCollector = nil } return s } // CreateSessionWithDomain creates a new Session and binds it with a Domain. // We need this because when we start DDL in Domain, the DDL need a session // to change some system tables. But at that time, we have been already in // a lock context, which cause we can't call createSession directly. func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, error) { return createSessionWithOpt(store, dom, dom.GetSchemaValidator(), dom.InfoCache(), nil) } const ( notBootstrapped = 0 ) func mustGetStoreBootstrapVersion(store kv.Storage) int64 { var ver int64 // check in kv store ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) err := kv.RunInNewTxn(ctx, store, false, func(_ context.Context, txn kv.Transaction) error { var err error t := meta.NewReader(txn) ver, err = t.GetBootstrapVersion() return err }) if err != nil { logutil.BgLogger().Fatal("get store bootstrap version failed", zap.Error(err)) } return ver } func getStoreBootstrapVersionWithCache(store kv.Storage) int64 { // check in memory _, ok := store.GetOption(StoreBootstrappedKey) if ok { return currentBootstrapVersion } ver := mustGetStoreBootstrapVersion(store) if ver > notBootstrapped { // here mean memory is not ok, but other server has already finished it store.SetOption(StoreBootstrappedKey, true) } modifyBootstrapVersionForTest(ver) return ver } func finishBootstrap(store kv.Storage) { store.SetOption(StoreBootstrappedKey, true) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnBootstrap) err := kv.RunInNewTxn(ctx, store, true, func(_ context.Context, txn kv.Transaction) error { t := meta.NewMutator(txn) err := t.FinishBootstrap(currentBootstrapVersion) return err }) if err != nil { logutil.BgLogger().Fatal("finish bootstrap failed", zap.Error(err)) } } const quoteCommaQuote = "', '" // loadCommonGlobalVariablesIfNeeded loads and applies commonly used global variables for the session. func (s *session) loadCommonGlobalVariablesIfNeeded() error { vars := s.sessionVars if vars.CommonGlobalLoaded { return nil } if s.Value(sessionctx.Initing) != nil { // When running bootstrap or upgrade, we should not access global storage. // But we need to init max_allowed_packet to use concat function during bootstrap or upgrade. err := vars.SetSystemVar(vardef.MaxAllowedPacket, strconv.FormatUint(vardef.DefMaxAllowedPacket, 10)) if err != nil { logutil.BgLogger().Error("set system variable max_allowed_packet error", zap.Error(err)) } return nil } vars.CommonGlobalLoaded = true // Deep copy sessionvar cache sessionCache, err := domain.GetDomain(s).GetSessionCache() if err != nil { return err } for varName, varVal := range sessionCache { if _, ok := vars.GetSystemVar(varName); !ok { err = vars.SetSystemVarWithRelaxedValidation(varName, varVal) if err != nil { if variable.ErrUnknownSystemVar.Equal(err) { continue // sessionCache is stale; sysvar has likely been unregistered } return err } } } // when client set Capability Flags CLIENT_INTERACTIVE, init wait_timeout with interactive_timeout if vars.ClientCapability&mysql.ClientInteractive > 0 { if varVal, ok := vars.GetSystemVar(vardef.InteractiveTimeout); ok { if err := vars.SetSystemVar(vardef.WaitTimeout, varVal); err != nil { return err } } } return nil } // PrepareTxnCtx begins a transaction, and creates a new transaction context. // When stmt is provided, it determines transaction mode based on the statement. // When stmt is nil, it uses the session's default transaction mode. func (s *session) PrepareTxnCtx(ctx context.Context, stmt ast.StmtNode) error { s.currentCtx = ctx if s.txn.validOrPending() { return nil } txnMode := s.decideTxnMode(stmt) return sessiontxn.GetTxnManager(s).EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{ Type: sessiontxn.EnterNewTxnBeforeStmt, TxnMode: txnMode, }) } // decideTxnMode determines whether to use pessimistic or optimistic transaction mode // based on the current session state, configuration, and the statement being executed. // When stmt is nil, it uses the session's default transaction mode. func (s *session) decideTxnMode(stmt ast.StmtNode) string { if s.sessionVars.RetryInfo.Retrying { return ast.Pessimistic } if s.sessionVars.TxnMode != ast.Pessimistic { return ast.Optimistic } if !s.sessionVars.IsAutocommit() { return s.sessionVars.TxnMode } if stmt != nil && s.shouldUsePessimisticAutoCommit(stmt) { return ast.Pessimistic } return ast.Optimistic } // shouldUsePessimisticAutoCommit checks if pessimistic-auto-commit should be applied // for the current statement. func (s *session) shouldUsePessimisticAutoCommit(stmtNode ast.StmtNode) bool { // Check if pessimistic-auto-commit is enabled globally if !config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() { return false } // Disabled for bulk DML operations if s.GetSessionVars().BulkDMLEnabled { return false } if s.isInternal() { return false } // Use direct AST inspection to determine if this is a DML statement return s.isDMLStatement(stmtNode) } // isDMLStatement checks if the given statement should use pessimistic-auto-commit. // It handles EXECUTE unwrapping and properly handles EXPLAIN statements by checking their inner statement. func (s *session) isDMLStatement(stmtNode ast.StmtNode) bool { if stmtNode == nil { return false } // Handle EXECUTE statements - unwrap to get the actual prepared statement actualStmt := stmtNode if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok { prepareStmt, err := plannercore.GetPreparedStmt(execStmt, s.GetSessionVars()) if err != nil || prepareStmt == nil { return false } actualStmt = prepareStmt.PreparedAst.Stmt } // For EXPLAIN statements, check the underlying statement // This ensures EXPLAIN shows the correct plan that would be used if the statement were executed if explainStmt, ok := actualStmt.(*ast.ExplainStmt); ok { return s.isDMLStatement(explainStmt.Stmt) } // Only these DML statements should use pessimistic-auto-commit // Note: LOAD DATA and IMPORT are intentionally excluded switch actualStmt.(type) { case *ast.InsertStmt, *ast.UpdateStmt, *ast.DeleteStmt: return true default: return false } } // PrepareTSFuture uses to try to get ts future. func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error { if s.txn.Valid() { return errors.New("cannot prepare ts future when txn is valid") } failpoint.Inject("assertTSONotRequest", func() { if _, ok := future.(sessiontxn.ConstantFuture); !ok && !s.isInternal() { panic("tso shouldn't be requested") } }) failpoint.InjectContext(ctx, "mockGetTSFail", func() { future = txnFailFuture{} }) s.txn.changeToPending(&txnFuture{ future: future, store: s.store, txnScope: scope, pipelined: s.usePipelinedDmlOrWarn(ctx), pipelinedFlushConcurrency: s.GetSessionVars().PipelinedFlushConcurrency, pipelinedResolveLockConcurrency: s.GetSessionVars().PipelinedResolveLockConcurrency, pipelinedWriteThrottleRatio: s.GetSessionVars().PipelinedWriteThrottleRatio, }) return nil } // GetPreparedTxnFuture returns the TxnFuture if it is valid or pending. // It returns nil otherwise. func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture { if !s.txn.validOrPending() { return nil } return &s.txn } // RefreshTxnCtx implements context.RefreshTxnCtx interface. func (s *session) RefreshTxnCtx(ctx context.Context) error { var commitDetail *tikvutil.CommitDetails ctx = context.WithValue(ctx, tikvutil.CommitDetailCtxKey, &commitDetail) err := s.doCommit(ctx) if commitDetail != nil { s.GetSessionVars().StmtCtx.MergeExecDetails(commitDetail) } if err != nil { return err } s.updateStatsDeltaToCollector() return sessiontxn.NewTxn(ctx, s) } // GetStore gets the store of session. func (s *session) GetStore() kv.Storage { return s.store } func (s *session) ShowProcess() *sessmgr.ProcessInfo { return s.processInfo.Load() } // GetStartTSFromSession returns the startTS in the session `se` func GetStartTSFromSession(se any) (startTS, processInfoID uint64) { tmp, ok := se.(*session) if !ok { logutil.BgLogger().Error("GetStartTSFromSession failed, can't transform to session struct") return 0, 0 } txnInfo := tmp.TxnInfo() if txnInfo != nil { startTS = txnInfo.StartTS if txnInfo.ProcessInfo != nil { processInfoID = txnInfo.ProcessInfo.ConnectionID } } logutil.BgLogger().Debug( "GetStartTSFromSession getting startTS of internal session", zap.Uint64("startTS", startTS), zap.Time("start time", oracle.GetTimeFromTS(startTS))) return startTS, processInfoID } // logStmt logs some crucial SQL including: CREATE USER/GRANT PRIVILEGE/CHANGE PASSWORD/DDL etc and normal SQL // if variable.ProcessGeneralLog is set. func logStmt(execStmt *executor.ExecStmt, s *session) { vars := s.GetSessionVars() isCrucial := false switch stmt := execStmt.StmtNode.(type) { case *ast.DropIndexStmt: isCrucial = true if stmt.IsHypo { isCrucial = false } case *ast.CreateIndexStmt: isCrucial = true if stmt.IndexOption != nil && stmt.IndexOption.Tp == ast.IndexTypeHypo { isCrucial = false } case *ast.CreateUserStmt, *ast.DropUserStmt, *ast.AlterUserStmt, *ast.SetPwdStmt, *ast.GrantStmt, *ast.RevokeStmt, *ast.AlterTableStmt, *ast.CreateDatabaseStmt, *ast.CreateTableStmt, *ast.DropDatabaseStmt, *ast.DropTableStmt, *ast.RenameTableStmt, *ast.TruncateTableStmt, *ast.RenameUserStmt, *ast.CreateBindingStmt, *ast.DropBindingStmt, *ast.SetBindingStmt, *ast.BRIEStmt: isCrucial = true } if isCrucial { user := vars.User schemaVersion := s.GetInfoSchema().SchemaMetaVersion() if ss, ok := execStmt.StmtNode.(ast.SensitiveStmtNode); ok { logutil.BgLogger().Info("CRUCIAL OPERATION", zap.Uint64("conn", vars.ConnectionID), zap.Int64("schemaVersion", schemaVersion), zap.String("secure text", ss.SecureText()), zap.Stringer("user", user)) } else { logutil.BgLogger().Info("CRUCIAL OPERATION", zap.Uint64("conn", vars.ConnectionID), zap.Int64("schemaVersion", schemaVersion), zap.String("cur_db", vars.CurrentDB), zap.String("sql", execStmt.StmtNode.Text()), zap.Stringer("user", user)) } } else { logGeneralQuery(execStmt, s, false) } } func logGeneralQuery(execStmt *executor.ExecStmt, s *session, isPrepared bool) { vars := s.GetSessionVars() if vardef.ProcessGeneralLog.Load() && !vars.InRestrictedSQL { var query string if isPrepared { query = execStmt.OriginText() } else { query = execStmt.GetTextToLog(false) } query = executor.QueryReplacer.Replace(query) if vars.EnableRedactLog != errors.RedactLogEnable { query += redact.String(vars.EnableRedactLog, vars.PlanCacheParams.String()) } fields := []zapcore.Field{ zap.Uint64("conn", vars.ConnectionID), zap.String("session_alias", vars.SessionAlias), zap.String("user", vars.User.LoginString()), zap.Int64("schemaVersion", s.GetInfoSchema().SchemaMetaVersion()), zap.Uint64("txnStartTS", vars.TxnCtx.StartTS), zap.Uint64("forUpdateTS", vars.TxnCtx.GetForUpdateTS()), zap.Bool("isReadConsistency", vars.IsIsolation(ast.ReadCommitted)), zap.String("currentDB", vars.CurrentDB), zap.Bool("isPessimistic", vars.TxnCtx.IsPessimistic), zap.String("sessionTxnMode", vars.GetReadableTxnMode()), zap.String("sql", query), } if ot := execStmt.OriginText(); ot != execStmt.Text() { fields = append(fields, zap.String("originText", strconv.Quote(ot))) } logutil.GeneralLogger.Info("GENERAL_LOG", fields...) } } func (s *session) recordOnTransactionExecution(err error, counter int, duration float64, isInternal bool) { if s.sessionVars.TxnCtx.IsPessimistic { if err != nil { if isInternal { session_metrics.TransactionDurationPessimisticAbortInternal.Observe(duration) session_metrics.StatementPerTransactionPessimisticErrorInternal.Observe(float64(counter)) } else { session_metrics.TransactionDurationPessimisticAbortGeneral.Observe(duration) session_metrics.StatementPerTransactionPessimisticErrorGeneral.Observe(float64(counter)) } } else { if isInternal { session_metrics.TransactionDurationPessimisticCommitInternal.Observe(duration) session_metrics.StatementPerTransactionPessimisticOKInternal.Observe(float64(counter)) } else { session_metrics.TransactionDurationPessimisticCommitGeneral.Observe(duration) session_metrics.StatementPerTransactionPessimisticOKGeneral.Observe(float64(counter)) } } } else { if err != nil { if isInternal { session_metrics.TransactionDurationOptimisticAbortInternal.Observe(duration) session_metrics.StatementPerTransactionOptimisticErrorInternal.Observe(float64(counter)) } else { session_metrics.TransactionDurationOptimisticAbortGeneral.Observe(duration) session_metrics.StatementPerTransactionOptimisticErrorGeneral.Observe(float64(counter)) } } else { if isInternal { session_metrics.TransactionDurationOptimisticCommitInternal.Observe(duration) session_metrics.StatementPerTransactionOptimisticOKInternal.Observe(float64(counter)) } else { session_metrics.TransactionDurationOptimisticCommitGeneral.Observe(duration) session_metrics.StatementPerTransactionOptimisticOKGeneral.Observe(float64(counter)) } } } } func (s *session) checkPlacementPolicyBeforeCommit(ctx context.Context) error { var err error // Get the txnScope of the transaction we're going to commit. txnScope := s.GetSessionVars().TxnCtx.TxnScope if txnScope == "" { txnScope = kv.GlobalTxnScope } if txnScope != kv.GlobalTxnScope { is := s.GetInfoSchema().(infoschema.InfoSchema) deltaMap := s.GetSessionVars().TxnCtx.TableDeltaMap for physicalTableID := range deltaMap { var tableName string var partitionName string tblInfo, _, partInfo := is.FindTableByPartitionID(physicalTableID) if tblInfo != nil && partInfo != nil { tableName = tblInfo.Meta().Name.String() partitionName = partInfo.Name.String() } else { tblInfo, _ := is.TableByID(ctx, physicalTableID) tableName = tblInfo.Meta().Name.String() } bundle, ok := is.PlacementBundleByPhysicalTableID(physicalTableID) if !ok { errMsg := fmt.Sprintf("table %v doesn't have placement policies with txn_scope %v", tableName, txnScope) if len(partitionName) > 0 { errMsg = fmt.Sprintf("table %v's partition %v doesn't have placement policies with txn_scope %v", tableName, partitionName, txnScope) } err = dbterror.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(errMsg) break } dcLocation, ok := bundle.GetLeaderDC(placement.DCLabelKey) if !ok { errMsg := fmt.Sprintf("table %v's leader placement policy is not defined", tableName) if len(partitionName) > 0 { errMsg = fmt.Sprintf("table %v's partition %v's leader placement policy is not defined", tableName, partitionName) } err = dbterror.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(errMsg) break } if dcLocation != txnScope { errMsg := fmt.Sprintf("table %v's leader location %v is out of txn_scope %v", tableName, dcLocation, txnScope) if len(partitionName) > 0 { errMsg = fmt.Sprintf("table %v's partition %v's leader location %v is out of txn_scope %v", tableName, partitionName, dcLocation, txnScope) } err = dbterror.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs(errMsg) break } // FIXME: currently we assume the physicalTableID is the partition ID. In future, we should consider the situation // if the physicalTableID belongs to a Table. partitionID := physicalTableID tbl, _, partitionDefInfo := is.FindTableByPartitionID(partitionID) if tbl != nil { tblInfo := tbl.Meta() state := tblInfo.Partition.GetStateByID(partitionID) if state == model.StateGlobalTxnOnly { err = dbterror.ErrInvalidPlacementPolicyCheck.GenWithStackByArgs( fmt.Sprintf("partition %s of table %s can not be written by local transactions when its placement policy is being altered", tblInfo.Name, partitionDefInfo.Name)) break } } } } return err } func (s *session) SetPort(port string) { s.sessionVars.Port = port } // GetTxnWriteThroughputSLI implements the Context interface. func (s *session) GetTxnWriteThroughputSLI() *sli.TxnWriteThroughputSLI { return &s.txn.writeSLI } // GetInfoSchema returns snapshotInfoSchema if snapshot schema is set. // Transaction infoschema is returned if inside an explicit txn. // Otherwise the latest infoschema is returned. func (s *session) GetInfoSchema() infoschemactx.MetaOnlyInfoSchema { vars := s.GetSessionVars() var is infoschema.InfoSchema if snap, ok := vars.SnapshotInfoschema.(infoschema.InfoSchema); ok { logutil.BgLogger().Info("use snapshot schema", zap.Uint64("conn", vars.ConnectionID), zap.Int64("schemaVersion", snap.SchemaMetaVersion())) is = snap } else { vars.TxnCtxMu.Lock() if vars.TxnCtx != nil { if tmp, ok := vars.TxnCtx.InfoSchema.(infoschema.InfoSchema); ok { is = tmp } } vars.TxnCtxMu.Unlock() } if is == nil { is = s.infoCache.GetLatest() } // Override the infoschema if the session has temporary table. return temptable.AttachLocalTemporaryTableInfoSchema(s, is) } func (s *session) GetLatestInfoSchema() infoschemactx.MetaOnlyInfoSchema { is := s.infoCache.GetLatest() extIs := &infoschema.SessionExtendedInfoSchema{InfoSchema: is} return temptable.AttachLocalTemporaryTableInfoSchema(s, extIs) } func (s *session) GetLatestISWithoutSessExt() infoschemactx.MetaOnlyInfoSchema { return s.infoCache.GetLatest() } func (s *session) GetSQLServer() sqlsvrapi.Server { return s.dom.(sqlsvrapi.Server) } func (s *session) IsCrossKS() bool { return s.crossKS } func (s *session) GetSchemaValidator() validatorapi.Validator { return s.schemaValidator } func getSnapshotInfoSchema(s sessionctx.Context, snapshotTS uint64) (infoschema.InfoSchema, error) { is, err := domain.GetDomain(s).GetSnapshotInfoSchema(snapshotTS) if err != nil { return nil, err } // Set snapshot does not affect the witness of the local temporary table. // The session always see the latest temporary tables. return temptable.AttachLocalTemporaryTableInfoSchema(s, is), nil } func (s *session) updateTelemetryMetric(es *executor.ExecStmt) { if es.Ti == nil { return } if s.isInternal() { return } ti := es.Ti if ti.UseRecursive { session_metrics.TelemetryCTEUsageRecurCTE.Inc() } else if ti.UseNonRecursive { session_metrics.TelemetryCTEUsageNonRecurCTE.Inc() } else { session_metrics.TelemetryCTEUsageNotCTE.Inc() } if ti.UseIndexMerge { session_metrics.TelemetryIndexMerge.Inc() } if ti.UseMultiSchemaChange { session_metrics.TelemetryMultiSchemaChangeUsage.Inc() } if ti.UseFlashbackToCluster { session_metrics.TelemetryFlashbackClusterUsage.Inc() } if ti.UseExchangePartition { session_metrics.TelemetryExchangePartitionUsage.Inc() } if ti.PartitionTelemetry != nil { if ti.PartitionTelemetry.UseTablePartition { session_metrics.TelemetryTablePartitionUsage.Inc() session_metrics.TelemetryTablePartitionMaxPartitionsUsage.Add(float64(ti.PartitionTelemetry.TablePartitionMaxPartitionsNum)) } if ti.PartitionTelemetry.UseTablePartitionList { session_metrics.TelemetryTablePartitionListUsage.Inc() } if ti.PartitionTelemetry.UseTablePartitionRange { session_metrics.TelemetryTablePartitionRangeUsage.Inc() } if ti.PartitionTelemetry.UseTablePartitionHash { session_metrics.TelemetryTablePartitionHashUsage.Inc() } if ti.PartitionTelemetry.UseTablePartitionRangeColumns { session_metrics.TelemetryTablePartitionRangeColumnsUsage.Inc() } if ti.PartitionTelemetry.UseTablePartitionRangeColumnsGt1 { session_metrics.TelemetryTablePartitionRangeColumnsGt1Usage.Inc() } if ti.PartitionTelemetry.UseTablePartitionRangeColumnsGt2 { session_metrics.TelemetryTablePartitionRangeColumnsGt2Usage.Inc() } if ti.PartitionTelemetry.UseTablePartitionRangeColumnsGt3 { session_metrics.TelemetryTablePartitionRangeColumnsGt3Usage.Inc() } if ti.PartitionTelemetry.UseTablePartitionListColumns { session_metrics.TelemetryTablePartitionListColumnsUsage.Inc() } if ti.PartitionTelemetry.UseCreateIntervalPartition { session_metrics.TelemetryTablePartitionCreateIntervalUsage.Inc() } if ti.PartitionTelemetry.UseAddIntervalPartition { session_metrics.TelemetryTablePartitionAddIntervalUsage.Inc() } if ti.PartitionTelemetry.UseDropIntervalPartition { session_metrics.TelemetryTablePartitionDropIntervalUsage.Inc() } if ti.PartitionTelemetry.UseCompactTablePartition { session_metrics.TelemetryTableCompactPartitionUsage.Inc() } if ti.PartitionTelemetry.UseReorganizePartition { session_metrics.TelemetryReorganizePartitionUsage.Inc() } } if ti.AccountLockTelemetry != nil { session_metrics.TelemetryLockUserUsage.Add(float64(ti.AccountLockTelemetry.LockUser)) session_metrics.TelemetryUnlockUserUsage.Add(float64(ti.AccountLockTelemetry.UnlockUser)) session_metrics.TelemetryCreateOrAlterUserUsage.Add(float64(ti.AccountLockTelemetry.CreateOrAlterUser)) } if ti.UseTableLookUp.Load() && s.sessionVars.StoreBatchSize > 0 { session_metrics.TelemetryStoreBatchedUsage.Inc() } } // GetBuiltinFunctionUsage returns the replica of counting of builtin function usage func (s *session) GetBuiltinFunctionUsage() map[string]uint32 { replica := make(map[string]uint32) s.functionUsageMu.RLock() defer s.functionUsageMu.RUnlock() for key, value := range s.functionUsageMu.builtinFunctionUsage { replica[key] = value } return replica } // BuiltinFunctionUsageInc increase the counting of the builtin function usage func (s *session) BuiltinFunctionUsageInc(scalarFuncSigName string) { s.functionUsageMu.Lock() defer s.functionUsageMu.Unlock() s.functionUsageMu.builtinFunctionUsage.Inc(scalarFuncSigName) } func (s *session) GetStmtStats() *stmtstats.StatementStats { return s.stmtStats } // SetMemoryFootprintChangeHook sets the hook that is called when the memdb changes its size. // Call this after s.txn becomes valid, since TxnInfo is initialized when the txn becomes valid. func (s *session) SetMemoryFootprintChangeHook() { if s.txn.MemHookSet() { return } if config.GetGlobalConfig().Performance.TxnTotalSizeLimit != config.DefTxnTotalSizeLimit { // if the user manually specifies the config, don't involve the new memory tracker mechanism, let the old config // work as before. return } hook := func(mem uint64) { if s.sessionVars.MemDBFootprint == nil { tracker := memory.NewTracker(memory.LabelForMemDB, -1) tracker.AttachTo(s.sessionVars.MemTracker) s.sessionVars.MemDBFootprint = tracker } s.sessionVars.MemDBFootprint.ReplaceBytesUsed(int64(mem)) } s.txn.SetMemoryFootprintChangeHook(hook) } func (s *session) EncodeStates(ctx context.Context, sessionStates *sessionstates.SessionStates) error { // Transaction status is hard to encode, so we do not support it. s.txn.mu.Lock() valid := s.txn.Valid() s.txn.mu.Unlock() if valid { return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("session has an active transaction") } // Data in local temporary tables is hard to encode, so we do not support it. // Check temporary tables here to avoid circle dependency. if s.sessionVars.LocalTemporaryTables != nil { localTempTables := s.sessionVars.LocalTemporaryTables.(*infoschema.SessionTables) if localTempTables.Count() > 0 { return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("session has local temporary tables") } } // The advisory locks will be released when the session is closed. if len(s.advisoryLocks) > 0 { return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("session has advisory locks") } // The TableInfo stores session ID and server ID, so the session cannot be migrated. if len(s.lockedTables) > 0 { return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("session has locked tables") } // It's insecure to migrate sandBoxMode because users can fake it. if s.InSandBoxMode() { return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("session is in sandbox mode") } if err := s.sessionVars.EncodeSessionStates(ctx, sessionStates); err != nil { return err } sessionStates.ResourceGroupName = s.sessionVars.ResourceGroupName hasRestrictVarPriv := false checker := privilege.GetPrivilegeManager(s) if checker == nil || checker.RequestDynamicVerification(s.sessionVars.ActiveRoles, "RESTRICTED_VARIABLES_ADMIN", false) { hasRestrictVarPriv = true } // Encode session variables. We put it here instead of SessionVars to avoid cycle import. sessionStates.SystemVars = make(map[string]string) for _, sv := range variable.GetSysVars() { switch { case sv.HasNoneScope(), !sv.HasSessionScope(): // Hidden attribute is deprecated. // None-scoped variables cannot be modified. // Noop variables should also be migrated even if they are noop. continue case sv.ReadOnly: // Skip read-only variables here. We encode them into SessionStates manually. continue } // Get all session variables because the default values may change between versions. val, keep, err := s.sessionVars.GetSessionStatesSystemVar(sv.Name) switch { case err != nil: return err case !keep: continue case !hasRestrictVarPriv && sem.IsEnabled() && sem.IsInvisibleSysVar(sv.Name): // If the variable has a global scope, it should be the same with the global one. // Otherwise, it should be the same with the default value. defaultVal := sv.Value if sv.HasGlobalScope() { // If the session value is the same with the global one, skip it. if defaultVal, err = sv.GetGlobalFromHook(ctx, s.sessionVars); err != nil { return err } } if val != defaultVal { // Case 1: the RESTRICTED_VARIABLES_ADMIN is revoked after setting the session variable. // Case 2: the global variable is updated after the session is created. // In any case, the variable can't be set in the new session, so give up. return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs(fmt.Sprintf("session has set invisible variable '%s'", sv.Name)) } default: sessionStates.SystemVars[sv.Name] = val } } // Encode prepared statements and sql bindings. for _, handler := range s.sessionStatesHandlers { if err := handler.EncodeSessionStates(ctx, s, sessionStates); err != nil { return err } } return nil } func (s *session) DecodeStates(ctx context.Context, sessionStates *sessionstates.SessionStates) error { // Decode prepared statements and sql bindings. for _, handler := range s.sessionStatesHandlers { if err := handler.DecodeSessionStates(ctx, s, sessionStates); err != nil { return err } } // Decode session variables. names := variable.OrderByDependency(sessionStates.SystemVars) // Some variables must be set before others, e.g. tidb_enable_noop_functions should be before noop variables. for _, name := range names { val := sessionStates.SystemVars[name] // Experimental system variables may change scope, data types, or even be removed. // We just ignore the errors and continue. if err := s.sessionVars.SetSystemVar(name, val); err != nil { logutil.Logger(ctx).Warn("set session variable during decoding session states error", zap.String("name", name), zap.String("value", val), zap.Error(err)) } } // Put resource group privilege check from sessionVars to session to avoid circular dependency. if sessionStates.ResourceGroupName != s.sessionVars.ResourceGroupName { hasPriv := true if vardef.EnableResourceControlStrictMode.Load() { checker := privilege.GetPrivilegeManager(s) if checker != nil { hasRgAdminPriv := checker.RequestDynamicVerification(s.sessionVars.ActiveRoles, "RESOURCE_GROUP_ADMIN", false) hasRgUserPriv := checker.RequestDynamicVerification(s.sessionVars.ActiveRoles, "RESOURCE_GROUP_USER", false) hasPriv = hasRgAdminPriv || hasRgUserPriv } } if hasPriv { s.sessionVars.SetResourceGroupName(sessionStates.ResourceGroupName) } else { logutil.Logger(ctx).Warn("set session states error, no privilege to set resource group, skip changing resource group", zap.String("source_resource_group", s.sessionVars.ResourceGroupName), zap.String("target_resource_group", sessionStates.ResourceGroupName)) } } // Decoding session vars / prepared statements may override stmt ctx, such as warnings, // so we decode stmt ctx at last. return s.sessionVars.DecodeSessionStates(ctx, sessionStates) } func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNode ast.StmtNode) { if !s.isInternal() { if txn, _ := s.Txn(false); txn != nil && txn.Valid() { if txn.IsPipelined() { stmtLabel = "pdml" } txn.SetOption(kv.RequestSourceType, stmtLabel) } s.sessionVars.RequestSourceType = stmtLabel return } if source := ctx.Value(kv.RequestSourceKey); source != nil { requestSource := source.(kv.RequestSource) if requestSource.RequestSourceType != "" { s.sessionVars.RequestSourceType = requestSource.RequestSourceType return } } // panic in test mode in case there are requests without source in the future. // log warnings in production mode. if intest.EnableInternalCheck { panic("unexpected no source type context, if you see this error, " + "the `RequestSourceTypeKey` is missing in your context") } logutil.Logger(ctx).Warn("unexpected no source type context, if you see this warning, "+ "the `RequestSourceTypeKey` is missing in the context", zap.Bool("internal", s.isInternal()), zap.String("sql", stmtNode.Text())) } // NewStmtIndexUsageCollector creates a new `*indexusage.StmtIndexUsageCollector` based on the internal session index // usage collector func (s *session) NewStmtIndexUsageCollector() *indexusage.StmtIndexUsageCollector { if s.idxUsageCollector == nil { return nil } return indexusage.NewStmtIndexUsageCollector(s.idxUsageCollector) } // usePipelinedDmlOrWarn returns the current statement can be executed as a pipelined DML. func (s *session) usePipelinedDmlOrWarn(ctx context.Context) bool { if !s.sessionVars.BulkDMLEnabled { return false } stmtCtx := s.sessionVars.StmtCtx if stmtCtx == nil { return false } if stmtCtx.IsReadOnly { return false } vars := s.GetSessionVars() if !vars.TxnCtx.EnableMDL { stmtCtx.AppendWarning( errors.New( "Pipelined DML can not be used without Metadata Lock. Fallback to standard mode", ), ) return false } if (vars.BatchCommit || vars.BatchInsert || vars.BatchDelete) && vars.DMLBatchSize > 0 && vardef.EnableBatchDML.Load() { stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used with the deprecated Batch DML. Fallback to standard mode")) return false } if !(stmtCtx.InInsertStmt || stmtCtx.InDeleteStmt || stmtCtx.InUpdateStmt) { if !stmtCtx.IsReadOnly { stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used for auto-commit INSERT, REPLACE, UPDATE or DELETE. Fallback to standard mode")) } return false } if s.isInternal() { stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used for internal SQL. Fallback to standard mode")) return false } if vars.InTxn() { stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used in transaction. Fallback to standard mode")) return false } if !vars.IsAutocommit() { stmtCtx.AppendWarning(errors.New("Pipelined DML can only be used in autocommit mode. Fallback to standard mode")) return false } if s.GetSessionVars().ConstraintCheckInPlace { // we enforce that pipelined DML must lazily check key. stmtCtx.AppendWarning( errors.New( "Pipelined DML can not be used when tidb_constraint_check_in_place=ON. " + "Fallback to standard mode", ), ) return false } is, ok := s.GetLatestInfoSchema().(infoschema.InfoSchema) if !ok { stmtCtx.AppendWarning(errors.New("Pipelined DML failed to get latest InfoSchema. Fallback to standard mode")) return false } for _, t := range stmtCtx.Tables { // get table schema from current infoschema tbl, err := is.TableByName(ctx, ast.NewCIStr(t.DB), ast.NewCIStr(t.Table)) if err != nil { stmtCtx.AppendWarning(errors.New("Pipelined DML failed to get table schema. Fallback to standard mode")) return false } if tbl.Meta().IsView() { stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used on view. Fallback to standard mode")) return false } if tbl.Meta().IsSequence() { stmtCtx.AppendWarning(errors.New("Pipelined DML can not be used on sequence. Fallback to standard mode")) return false } if vars.ForeignKeyChecks && (len(tbl.Meta().ForeignKeys) > 0 || len(is.GetTableReferredForeignKeys(t.DB, t.Table)) > 0) { stmtCtx.AppendWarning( errors.New( "Pipelined DML can not be used on table with foreign keys when foreign_key_checks = ON. Fallback to standard mode", ), ) return false } if tbl.Meta().TempTableType != model.TempTableNone { stmtCtx.AppendWarning( errors.New( "Pipelined DML can not be used on temporary tables. " + "Fallback to standard mode", ), ) return false } if tbl.Meta().TableCacheStatusType != model.TableCacheStatusDisable { stmtCtx.AppendWarning( errors.New( "Pipelined DML can not be used on cached tables. " + "Fallback to standard mode", ), ) return false } } // tidb_dml_type=bulk will invalidate the config pessimistic-auto-commit. // The behavior is as if the config is set to false. But we generate a warning for it. if config.GetGlobalConfig().PessimisticTxn.PessimisticAutoCommit.Load() { stmtCtx.AppendWarning( errors.New( "pessimistic-auto-commit config is ignored in favor of Pipelined DML", ), ) } return true } // GetDBNames gets the sql layer database names from the session. func GetDBNames(seVar *variable.SessionVars) []string { dbNames := make(map[string]struct{}) if seVar == nil || !config.GetGlobalConfig().Status.RecordDBLabel { return []string{""} } if seVar.StmtCtx != nil { for _, t := range seVar.StmtCtx.Tables { dbNames[t.DB] = struct{}{} } } if len(dbNames) == 0 { dbNames[strings.ToLower(seVar.CurrentDB)] = struct{}{} } ns := make([]string, 0, len(dbNames)) for n := range dbNames { ns = append(ns, n) } return ns } // GetCursorTracker returns the internal `cursor.Tracker` func (s *session) GetCursorTracker() cursor.Tracker { return s.cursorTracker } // GetCommitWaitGroup returns the internal `sync.WaitGroup` for async commit and secondary key lock cleanup func (s *session) GetCommitWaitGroup() *sync.WaitGroup { return &s.commitWaitGroup } // GetDomain get domain from session. func (s *session) GetDomain() any { return s.dom }