215 lines
6.3 KiB
Go
215 lines
6.3 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package testkit
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"maps"
|
|
"sync"
|
|
|
|
"github.com/pingcap/tidb/pkg/domain"
|
|
"github.com/pingcap/tidb/pkg/infoschema/issyncer/mdldef"
|
|
"github.com/pingcap/tidb/pkg/parser/ast"
|
|
"github.com/pingcap/tidb/pkg/parser/auth"
|
|
"github.com/pingcap/tidb/pkg/planner/core"
|
|
"github.com/pingcap/tidb/pkg/session/sessionapi"
|
|
"github.com/pingcap/tidb/pkg/session/sessmgr"
|
|
"github.com/pingcap/tidb/pkg/session/txninfo"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/variable"
|
|
)
|
|
|
|
// MockSessionManager is a mocked session manager which is used for test.
|
|
type MockSessionManager struct {
|
|
PS []*sessmgr.ProcessInfo
|
|
PSMu sync.RWMutex
|
|
SerID uint64
|
|
TxnInfo []*txninfo.TxnInfo
|
|
Dom *domain.Domain
|
|
Conn map[uint64]sessionapi.Session
|
|
mu sync.Mutex
|
|
ConAttrs map[uint64]map[string]string
|
|
|
|
internalSessions map[any]struct{}
|
|
}
|
|
|
|
// ShowTxnList is to show txn list.
|
|
func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {
|
|
msm.mu.Lock()
|
|
defer msm.mu.Unlock()
|
|
if len(msm.TxnInfo) > 0 {
|
|
return msm.TxnInfo
|
|
}
|
|
rs := make([]*txninfo.TxnInfo, 0, len(msm.Conn))
|
|
for _, se := range msm.Conn {
|
|
info := se.TxnInfo()
|
|
if info != nil && info.ProcessInfo != nil {
|
|
rs = append(rs, info)
|
|
}
|
|
}
|
|
return rs
|
|
}
|
|
|
|
// ShowProcessList implements the Manager.ShowProcessList interface.
|
|
func (msm *MockSessionManager) ShowProcessList() map[uint64]*sessmgr.ProcessInfo {
|
|
msm.PSMu.RLock()
|
|
defer msm.PSMu.RUnlock()
|
|
ret := make(map[uint64]*sessmgr.ProcessInfo)
|
|
if len(msm.PS) > 0 {
|
|
for _, item := range msm.PS {
|
|
ret[item.ID] = item
|
|
}
|
|
return ret
|
|
}
|
|
msm.mu.Lock()
|
|
for connID, pi := range msm.Conn {
|
|
ret[connID] = pi.ShowProcess()
|
|
}
|
|
msm.mu.Unlock()
|
|
if msm.Dom != nil {
|
|
maps.Copy(ret, msm.Dom.SysProcTracker().GetSysProcessList())
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// GetProcessInfo implements the Manager.GetProcessInfo interface.
|
|
func (msm *MockSessionManager) GetProcessInfo(id uint64) (*sessmgr.ProcessInfo, bool) {
|
|
msm.PSMu.RLock()
|
|
defer msm.PSMu.RUnlock()
|
|
for _, item := range msm.PS {
|
|
if item.ID == id {
|
|
return item, true
|
|
}
|
|
}
|
|
msm.mu.Lock()
|
|
defer msm.mu.Unlock()
|
|
if sess := msm.Conn[id]; sess != nil {
|
|
return sess.ShowProcess(), true
|
|
}
|
|
if msm.Dom != nil {
|
|
if pinfo, ok := msm.Dom.SysProcTracker().GetSysProcessList()[id]; ok {
|
|
return pinfo, true
|
|
}
|
|
}
|
|
return &sessmgr.ProcessInfo{}, false
|
|
}
|
|
|
|
// GetConAttrs returns the connection attributes of all connections
|
|
func (msm *MockSessionManager) GetConAttrs(user *auth.UserIdentity) map[uint64]map[string]string {
|
|
return msm.ConAttrs
|
|
}
|
|
|
|
// Kill implements the Manager.Kill interface.
|
|
func (*MockSessionManager) Kill(uint64, bool, bool, bool) {
|
|
}
|
|
|
|
// KillAllConnections implements the Manager.KillAllConnections interface.
|
|
func (*MockSessionManager) KillAllConnections() {
|
|
}
|
|
|
|
// UpdateTLSConfig implements the Manager.UpdateTLSConfig interface.
|
|
func (*MockSessionManager) UpdateTLSConfig(*tls.Config) {
|
|
}
|
|
|
|
// ServerID get server id.
|
|
func (msm *MockSessionManager) ServerID() uint64 {
|
|
return msm.SerID
|
|
}
|
|
|
|
// StoreInternalSession is to store internal session.
|
|
func (msm *MockSessionManager) StoreInternalSession(s any) {
|
|
msm.mu.Lock()
|
|
if msm.internalSessions == nil {
|
|
msm.internalSessions = make(map[any]struct{})
|
|
}
|
|
msm.internalSessions[s] = struct{}{}
|
|
msm.mu.Unlock()
|
|
}
|
|
|
|
// ContainsInternalSession checks if the internal session pointer is in the map in the Manager
|
|
func (msm *MockSessionManager) ContainsInternalSession(se any) bool {
|
|
msm.mu.Lock()
|
|
defer msm.mu.Unlock()
|
|
if msm.internalSessions == nil {
|
|
return false
|
|
}
|
|
_, ok := msm.internalSessions[se]
|
|
return ok
|
|
}
|
|
|
|
// InternalSessionCount implements the Manager.InternalSessionCount interface.
|
|
func (msm *MockSessionManager) InternalSessionCount() int {
|
|
msm.mu.Lock()
|
|
defer msm.mu.Unlock()
|
|
return len(msm.internalSessions)
|
|
}
|
|
|
|
// DeleteInternalSession is to delete the internal session pointer from the map in the Manager
|
|
func (msm *MockSessionManager) DeleteInternalSession(s any) {
|
|
msm.mu.Lock()
|
|
delete(msm.internalSessions, s)
|
|
msm.mu.Unlock()
|
|
}
|
|
|
|
// GetInternalSessionStartTSList is to get all startTS of every transaction running in the current internal sessions
|
|
func (msm *MockSessionManager) GetInternalSessionStartTSList() []uint64 {
|
|
msm.mu.Lock()
|
|
defer msm.mu.Unlock()
|
|
ret := make([]uint64, 0, len(msm.internalSessions))
|
|
for internalSess := range msm.internalSessions {
|
|
// Ref the implementation of `GetInternalSessionStartTSList` on the real session manager. The `TxnInfo` is more
|
|
// accurate, because if a session is pending, the `StartTS` in `sessVars.TxnCtx` will not be updated. For example,
|
|
// if there is not DDL for a long time, the minimal internal session start ts will not have any progress.
|
|
if se, ok := internalSess.(interface{ TxnInfo() *txninfo.TxnInfo }); ok {
|
|
txn := se.TxnInfo()
|
|
if txn != nil {
|
|
ret = append(ret, txn.StartTS)
|
|
}
|
|
continue
|
|
}
|
|
}
|
|
return ret
|
|
}
|
|
|
|
// KillNonFlashbackClusterConn implement Manager interface.
|
|
func (msm *MockSessionManager) KillNonFlashbackClusterConn() {
|
|
for _, se := range msm.Conn {
|
|
processInfo := se.ShowProcess()
|
|
ddl, ok := processInfo.StmtCtx.GetPlan().(*core.DDL)
|
|
if !ok {
|
|
msm.Kill(se.GetSessionVars().ConnectionID, false, false, false)
|
|
continue
|
|
}
|
|
_, ok = ddl.Statement.(*ast.FlashBackToTimestampStmt)
|
|
if !ok {
|
|
msm.Kill(se.GetSessionVars().ConnectionID, false, false, false)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
// CheckOldRunningTxn is to get all startTS of every transactions running in the current internal sessions
|
|
func (msm *MockSessionManager) CheckOldRunningTxn(jobs map[int64]*mdldef.JobMDL) {
|
|
msm.mu.Lock()
|
|
for _, se := range msm.Conn {
|
|
variable.RemoveLockDDLJobs(se.GetSessionVars(), jobs, false)
|
|
}
|
|
msm.mu.Unlock()
|
|
}
|
|
|
|
// GetStatusVars is getting the per-session status variables
|
|
func (msm *MockSessionManager) GetStatusVars() map[uint64]map[string]string {
|
|
return map[uint64]map[string]string{}
|
|
}
|