Files
tidb/pkg/testkit/mocksessionmanager.go
2025-11-14 13:32:11 +00:00

215 lines
6.3 KiB
Go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package testkit
import (
"crypto/tls"
"maps"
"sync"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/infoschema/issyncer/mdldef"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/auth"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/session/sessionapi"
"github.com/pingcap/tidb/pkg/session/sessmgr"
"github.com/pingcap/tidb/pkg/session/txninfo"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
)
// MockSessionManager is a mocked session manager which is used for test.
type MockSessionManager struct {
PS []*sessmgr.ProcessInfo
PSMu sync.RWMutex
SerID uint64
TxnInfo []*txninfo.TxnInfo
Dom *domain.Domain
Conn map[uint64]sessionapi.Session
mu sync.Mutex
ConAttrs map[uint64]map[string]string
internalSessions map[any]struct{}
}
// ShowTxnList is to show txn list.
func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
msm.mu.Lock()
defer msm.mu.Unlock()
if len(msm.TxnInfo) > 0 {
return msm.TxnInfo
}
rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn))
for _, se := range msm.Conn {
info := se.TxnInfo()
if info != nil && info.ProcessInfo != nil {
rs = append(rs, info)
}
}
return rs
}
// ShowProcessList implements the Manager.ShowProcessList interface.
func (msm *MockSessionManager) ShowProcessList() map[uint64]*sessmgr.ProcessInfo {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
ret := make(map[uint64]*sessmgr.ProcessInfo)
if len(msm.PS) > 0 {
for _, item := range msm.PS {
ret[item.ID] = item
}
return ret
}
msm.mu.Lock()
for connID, pi := range msm.Conn {
ret[connID] = pi.ShowProcess()
}
msm.mu.Unlock()
if msm.Dom != nil {
maps.Copy(ret, msm.Dom.SysProcTracker().GetSysProcessList())
}
return ret
}
// GetProcessInfo implements the Manager.GetProcessInfo interface.
func (msm *MockSessionManager) GetProcessInfo(id uint64) (*sessmgr.ProcessInfo, bool) {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
for _, item := range msm.PS {
if item.ID == id {
return item, true
}
}
msm.mu.Lock()
defer msm.mu.Unlock()
if sess := msm.Conn[id]; sess != nil {
return sess.ShowProcess(), true
}
if msm.Dom != nil {
if pinfo, ok := msm.Dom.SysProcTracker().GetSysProcessList()[id]; ok {
return pinfo, true
}
}
return &sessmgr.ProcessInfo{}, false
}
// GetConAttrs returns the connection attributes of all connections
func (msm *MockSessionManager) GetConAttrs(user *auth.UserIdentity) map[uint64]map[string]string {
return msm.ConAttrs
}
// Kill implements the Manager.Kill interface.
func (*MockSessionManager) Kill(uint64, bool, bool, bool) {
}
// KillAllConnections implements the Manager.KillAllConnections interface.
func (*MockSessionManager) KillAllConnections() {
}
// UpdateTLSConfig implements the Manager.UpdateTLSConfig interface.
func (*MockSessionManager) UpdateTLSConfig(*tls.Config) {
}
// ServerID get server id.
func (msm *MockSessionManager) ServerID() uint64 {
return msm.SerID
}
// StoreInternalSession is to store internal session.
func (msm *MockSessionManager) StoreInternalSession(s any) {
msm.mu.Lock()
if msm.internalSessions == nil {
msm.internalSessions = make(map[any]struct{})
}
msm.internalSessions[s] = struct{}{}
msm.mu.Unlock()
}
// ContainsInternalSession checks if the internal session pointer is in the map in the Manager
func (msm *MockSessionManager) ContainsInternalSession(se any) bool {
msm.mu.Lock()
defer msm.mu.Unlock()
if msm.internalSessions == nil {
return false
}
_, ok := msm.internalSessions[se]
return ok
}
// InternalSessionCount implements the Manager.InternalSessionCount interface.
func (msm *MockSessionManager) InternalSessionCount() int {
msm.mu.Lock()
defer msm.mu.Unlock()
return len(msm.internalSessions)
}
// DeleteInternalSession is to delete the internal session pointer from the map in the Manager
func (msm *MockSessionManager) DeleteInternalSession(s any) {
msm.mu.Lock()
delete(msm.internalSessions, s)
msm.mu.Unlock()
}
// GetInternalSessionStartTSList is to get all startTS of every transaction running in the current internal sessions
func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 {
msm.mu.Lock()
defer msm.mu.Unlock()
ret := make([]uint64, 0, len(msm.internalSessions))
for internalSess := range msm.internalSessions {
// Ref the implementation of `GetInternalSessionStartTSList` on the real session manager. The `TxnInfo` is more
// accurate, because if a session is pending, the `StartTS` in `sessVars.TxnCtx` will not be updated. For example,
// if there is not DDL for a long time, the minimal internal session start ts will not have any progress.
if se, ok := internalSess.(interface{ TxnInfo() *txninfo.TxnInfo }); ok {
txn := se.TxnInfo()
if txn != nil {
ret = append(ret, txn.StartTS)
}
continue
}
}
return ret
}
// KillNonFlashbackClusterConn implement Manager interface.
func (msm *MockSessionManager) KillNonFlashbackClusterConn() {
for _, se := range msm.Conn {
processInfo := se.ShowProcess()
ddl, ok := processInfo.StmtCtx.GetPlan().(*core.DDL)
if !ok {
msm.Kill(se.GetSessionVars().ConnectionID, false, false, false)
continue
}
_, ok = ddl.Statement.(*ast.FlashBackToTimestampStmt)
if !ok {
msm.Kill(se.GetSessionVars().ConnectionID, false, false, false)
continue
}
}
}
// CheckOldRunningTxn is to get all startTS of every transactions running in the current internal sessions
func (msm *MockSessionManager) CheckOldRunningTxn(jobs map[int64]*mdldef.JobMDL) {
msm.mu.Lock()
for _, se := range msm.Conn {
variable.RemoveLockDDLJobs(se.GetSessionVars(), jobs, false)
}
msm.mu.Unlock()
}
// GetStatusVars is getting the per-session status variables
func (msm *MockSessionManager) GetStatusVars() map[uint64]map[string]string {
return map[uint64]map[string]string{}
}