@ -299,8 +299,8 @@ func TestShow(t *testing.T) {
|
||||
|
||||
tk.MustQuery("SHOW PROCEDURE STATUS WHERE Db='test'").Check(testkit.Rows())
|
||||
tk.MustQuery("SHOW TRIGGERS WHERE `Trigger` ='test'").Check(testkit.Rows())
|
||||
tk.MustQuery("SHOW PROCESSLIST;").Check(testkit.Rows("1 test Sleep 0 autocommit SHOW PROCESSLIST;"))
|
||||
tk.MustQuery("SHOW FULL PROCESSLIST;").Check(testkit.Rows("1 test Sleep 0 autocommit SHOW FULL PROCESSLIST;"))
|
||||
tk.MustQuery("SHOW PROCESSLIST;").Check(testkit.Rows(fmt.Sprintf("%d test Sleep 0 autocommit SHOW PROCESSLIST;", tk.Session().ShowProcess().ID)))
|
||||
tk.MustQuery("SHOW FULL PROCESSLIST;").Check(testkit.Rows(fmt.Sprintf("%d test Sleep 0 autocommit SHOW FULL PROCESSLIST;", tk.Session().ShowProcess().ID)))
|
||||
tk.MustQuery("SHOW EVENTS WHERE Db = 'test'").Check(testkit.Rows())
|
||||
tk.MustQuery("SHOW PLUGINS").Check(testkit.Rows())
|
||||
tk.MustQuery("SHOW PROFILES").Check(testkit.Rows())
|
||||
|
||||
@ -3550,15 +3550,21 @@ func (s *session) setRequestSource(ctx context.Context, stmtLabel string, stmtNo
|
||||
|
||||
// RemoveLockDDLJobs removes the DDL jobs which doesn't get the metadata lock from job2ver.
|
||||
func RemoveLockDDLJobs(s Session, job2ver map[int64]int64, job2ids map[int64]string) {
|
||||
if s.GetSessionVars().InRestrictedSQL {
|
||||
sv := s.GetSessionVars()
|
||||
if sv.InRestrictedSQL {
|
||||
return
|
||||
}
|
||||
s.GetSessionVars().GetRelatedTableForMDL().Range(func(tblID, value any) bool {
|
||||
sv.TxnCtxMu.Lock()
|
||||
defer sv.TxnCtxMu.Unlock()
|
||||
if sv.TxnCtx == nil {
|
||||
return
|
||||
}
|
||||
sv.GetRelatedTableForMDL().Range(func(tblID, value any) bool {
|
||||
for jobID, ver := range job2ver {
|
||||
ids := util.Str2Int64Map(job2ids[jobID])
|
||||
if _, ok := ids[tblID.(int64)]; ok && value.(int64) < ver {
|
||||
delete(job2ver, jobID)
|
||||
logutil.BgLogger().Debug("old running transaction block DDL", zap.Int64("table ID", tblID.(int64)), zap.Uint64("conn ID", s.GetSessionVars().ConnectionID))
|
||||
logutil.BgLogger().Debug("old running transaction block DDL", zap.Int64("table ID", tblID.(int64)), zap.Uint64("conn ID", sv.ConnectionID))
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
||||
@ -642,6 +642,8 @@ type SessionVars struct {
|
||||
RetryInfo *RetryInfo
|
||||
// TxnCtx Should be reset on transaction finished.
|
||||
TxnCtx *TransactionContext
|
||||
// TxnCtxMu is used to protect TxnCtx.
|
||||
TxnCtxMu sync.Mutex
|
||||
|
||||
// TxnManager is used to manage txn context in session
|
||||
TxnManager interface{}
|
||||
|
||||
@ -119,7 +119,9 @@ func (p *baseTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn
|
||||
if p.onInitializeTxnCtx != nil {
|
||||
p.onInitializeTxnCtx(txnCtx)
|
||||
}
|
||||
sessVars.TxnCtxMu.Lock()
|
||||
sessVars.TxnCtx = txnCtx
|
||||
sessVars.TxnCtxMu.Unlock()
|
||||
if variable.EnableMDL.Load() {
|
||||
sessVars.TxnCtx.EnableMDL = true
|
||||
}
|
||||
|
||||
@ -119,6 +119,7 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
sessVars.TxnCtxMu.Lock()
|
||||
sessVars.TxnCtx = &variable.TransactionContext{
|
||||
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
|
||||
InfoSchema: is,
|
||||
@ -129,6 +130,7 @@ func (p *StalenessTxnContextProvider) activateStaleTxn() error {
|
||||
TxnScope: txnScope,
|
||||
},
|
||||
}
|
||||
sessVars.TxnCtxMu.Unlock()
|
||||
|
||||
if interceptor := temptable.SessionSnapshotInterceptor(p.sctx, is); interceptor != nil {
|
||||
txn.SetOption(kv.SnapInterceptor, interceptor)
|
||||
|
||||
Reference in New Issue
Block a user