Files
tidb/pkg/session/tidb.go

375 lines
12 KiB
Go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2013 The ql Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSES/QL-LICENSE file.
package session
import (
"context"
"time"
"github.com/ngaut/pools"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/schematracker"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/executor"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/infoschema/issyncer"
"github.com/pingcap/tidb/pkg/infoschema/validatorapi"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
session_metrics "github.com/pingcap/tidb/pkg/session/metrics"
"github.com/pingcap/tidb/pkg/session/sessionapi"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/pingcap/tidb/pkg/sessiontxn"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/pingcap/tidb/pkg/util/syncutil"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
// StoreBootstrappedKey is used by store.G/SetOption to store related bootstrap context for kv.Storage.
const StoreBootstrappedKey = "bootstrap"
type domainMap struct {
mu syncutil.Mutex
domains map[string]*domain.Domain
}
// Get or create the domain for store.
// TODO decouple domain create from it, it's more clear to create domain explicitly
// before any usage of it.
func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
return dm.getWithEtcdClient(store, nil, nil)
}
// GetOrCreateWithEtcdClient gets or creates the domain for store with etcd client.
//
// Caveat: If there is already a domain opened with your `store`, the filter passed in will be ignored and
// the actual schema filter of the returned `Domain` is the one when the domain were created.
func (dm *domainMap) GetOrCreateWithFilter(store kv.Storage, filter issyncer.Filter) (d *domain.Domain, err error) {
return dm.getWithEtcdClient(store, nil, filter)
}
func (dm *domainMap) getWithEtcdClient(store kv.Storage, etcdClient *clientv3.Client, schemaFilter issyncer.Filter) (d *domain.Domain, err error) {
dm.mu.Lock()
defer dm.mu.Unlock()
if store == nil {
for _, d := range dm.domains {
// return available domain if any
return d, nil
}
return nil, errors.New("can not find available domain for a nil store")
}
key := store.UUID()
d = dm.domains[key]
if d != nil {
return
}
ddlLease := vardef.GetSchemaLease()
statisticLease := vardef.GetStatsLease()
planReplayerGCLease := vardef.GetPlanReplayerGCLease()
err = util.RunWithRetry(util.DefaultMaxRetries, util.RetryInterval, func() (retry bool, err1 error) {
logutil.BgLogger().Info("new domain",
zap.String("store", store.UUID()),
zap.Stringer("ddl lease", ddlLease),
zap.Stringer("stats lease", statisticLease))
factory := getSessionFactory(store)
sysFactory := getSessionFactoryWithDom(store)
d = domain.NewDomainWithEtcdClient(store, ddlLease, statisticLease, planReplayerGCLease, factory,
func(targetKS string, schemaValidator validatorapi.Validator) pools.Factory {
return getCrossKSSessionFactory(store, targetKS, schemaValidator)
},
etcdClient,
schemaFilter,
)
var ddlInjector func(ddl.DDL, ddl.Executor, *infoschema.InfoCache) *schematracker.Checker
if injector, ok := store.(schematracker.StorageDDLInjector); ok {
ddlInjector = injector.Injector
}
err1 = d.Init(sysFactory, ddlInjector)
if err1 != nil {
// If we don't clean it, there are some dirty data when retrying the function of Init.
d.Close()
logutil.BgLogger().Error("init domain failed", zap.String("category", "ddl"),
zap.Error(err1))
}
return true, err1
})
if err != nil {
return nil, err
}
dm.domains[key] = d
d.SetOnClose(func() {
dm.Delete(store)
})
return
}
func (dm *domainMap) Delete(store kv.Storage) {
dm.mu.Lock()
delete(dm.domains, store.UUID())
dm.mu.Unlock()
}
var (
domap = &domainMap{
domains: map[string]*domain.Domain{},
}
)
// ResetStoreForWithTiKVTest is only used in the test code.
// TODO: Remove domap and storeBootstrapped. Use store.SetOption() to do it.
func ResetStoreForWithTiKVTest(store kv.Storage) {
domap.Delete(store)
store.SetOption(StoreBootstrappedKey, nil)
}
// DisableStats4Test disables the stats for tests.
func DisableStats4Test() {
vardef.SetStatsLease(-1)
}
// Parse parses a query string to raw ast.StmtNode.
func Parse(ctx sessionctx.Context, src string) ([]ast.StmtNode, error) {
logutil.BgLogger().Debug("compiling", zap.String("source", src))
sessVars := ctx.GetSessionVars()
p := parser.New()
p.SetParserConfig(sessVars.BuildParserConfig())
p.SetSQLMode(sessVars.SQLMode)
stmts, warns, err := p.ParseSQL(src, sessVars.GetParseParams()...)
for _, warn := range warns {
sessVars.StmtCtx.AppendWarning(warn)
}
if err != nil {
logutil.BgLogger().Warn("compiling",
zap.String("source", src),
zap.Error(err))
return nil, err
}
return stmts, nil
}
func recordAbortTxnDuration(sessVars *variable.SessionVars, isInternal bool) {
duration := time.Since(sessVars.TxnCtx.CreateTime).Seconds()
if sessVars.TxnCtx.IsPessimistic {
if isInternal {
session_metrics.TransactionDurationPessimisticAbortInternal.Observe(duration)
} else {
session_metrics.TransactionDurationPessimisticAbortGeneral.Observe(duration)
}
} else {
if isInternal {
session_metrics.TransactionDurationOptimisticAbortInternal.Observe(duration)
} else {
session_metrics.TransactionDurationOptimisticAbortGeneral.Observe(duration)
}
}
}
func finishStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error {
failpoint.Inject("finishStmtError", func() {
failpoint.Return(errors.New("occur an error after finishStmt"))
})
sessVars := se.sessionVars
if !sql.IsReadOnly(sessVars) {
// All the history should be added here.
if meetsErr == nil && sessVars.TxnCtx.CouldRetry {
GetHistory(se).Add(sql, sessVars.StmtCtx)
}
// Handle the stmt commit/rollback.
if se.txn.Valid() {
if meetsErr != nil {
se.StmtRollback(ctx, false)
} else {
se.StmtCommit(ctx)
}
}
}
err := autoCommitAfterStmt(ctx, se, meetsErr, sql)
if se.txn.pending() {
// After run statement finish, txn state is still pending means the
// statement never need a Txn(), such as:
//
// set @@tidb_general_log = 1
// set @@autocommit = 0
// select 1
//
// Reset txn state to invalid to dispose the pending start ts.
se.txn.changeToInvalid()
}
if err != nil {
return err
}
return checkStmtLimit(ctx, se, true)
}
func autoCommitAfterStmt(ctx context.Context, se *session, meetsErr error, sql sqlexec.Statement) error {
isInternal := false
if internal := se.txn.GetOption(kv.RequestSourceInternal); internal != nil && internal.(bool) {
isInternal = true
}
sessVars := se.sessionVars
if meetsErr != nil {
if !sessVars.InTxn() {
logutil.BgLogger().Info("rollbackTxn called due to ddl/autocommit failure")
se.RollbackTxn(ctx)
recordAbortTxnDuration(sessVars, isInternal)
} else if se.txn.Valid() && se.txn.IsPessimistic() && exeerrors.ErrDeadlock.Equal(meetsErr) {
logutil.BgLogger().Info("rollbackTxn for deadlock", zap.Uint64("txn", se.txn.StartTS()))
se.RollbackTxn(ctx)
recordAbortTxnDuration(sessVars, isInternal)
}
return meetsErr
}
if !sessVars.InTxn() {
if err := se.CommitTxn(ctx); err != nil {
if _, ok := sql.(*executor.ExecStmt).StmtNode.(*ast.CommitStmt); ok {
err = errors.Annotatef(err, "previous statement: %s", se.GetSessionVars().PrevStmt)
}
return err
}
return nil
}
return nil
}
func checkStmtLimit(ctx context.Context, se *session, isFinish bool) error {
// If the user insert, insert, insert ... but never commit, TiDB would OOM.
// So we limit the statement count in a transaction here.
var err error
sessVars := se.GetSessionVars()
history := GetHistory(se)
stmtCount := history.Count()
if !isFinish {
// history stmt count + current stmt, since current stmt is not finish, it has not add to history.
stmtCount++
}
if stmtCount > int(config.GetGlobalConfig().Performance.StmtCountLimit) {
if !sessVars.BatchCommit {
se.RollbackTxn(ctx)
return errors.Errorf("statement count %d exceeds the transaction limitation, transaction has been rollback, autocommit = %t",
stmtCount, sessVars.IsAutocommit())
}
if !isFinish {
// if the stmt is not finish execute, then just return, since some work need to be done such as StmtCommit.
return nil
}
// If the stmt is finish execute, and exceed the StmtCountLimit, and BatchCommit is true,
// then commit the current transaction and create a new transaction.
err = sessiontxn.NewTxn(ctx, se)
// The transaction does not committed yet, we need to keep it in transaction.
// The last history could not be "commit"/"rollback" statement.
// It means it is impossible to start a new transaction at the end of the transaction.
// Because after the server executed "commit"/"rollback" statement, the session is out of the transaction.
sessVars.SetInTxn(true)
}
return err
}
// GetHistory get all stmtHistory in current txn. Exported only for test.
// If stmtHistory is nil, will create a new one for current txn.
func GetHistory(ctx sessionctx.Context) *StmtHistory {
hist, ok := ctx.GetSessionVars().TxnCtx.History.(*StmtHistory)
if ok {
return hist
}
hist = new(StmtHistory)
ctx.GetSessionVars().TxnCtx.History = hist
return hist
}
// GetRows4Test gets all the rows from a RecordSet, only used for test.
func GetRows4Test(ctx context.Context, _ sessionctx.Context, rs sqlexec.RecordSet) ([]chunk.Row, error) {
if rs == nil {
return nil, nil
}
var rows []chunk.Row
req := rs.NewChunk(nil)
// Must reuse `req` for imitating server.(*clientConn).writeChunks
for {
err := rs.Next(ctx, req)
if err != nil {
return nil, err
}
if req.NumRows() == 0 {
break
}
iter := chunk.NewIterator4Chunk(req.CopyConstruct())
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
rows = append(rows, row)
}
}
return rows, nil
}
// ResultSetToStringSlice changes the RecordSet to [][]string.
func ResultSetToStringSlice(ctx context.Context, s sessionapi.Session, rs sqlexec.RecordSet) ([][]string, error) {
rows, err := GetRows4Test(ctx, s, rs)
if err != nil {
return nil, err
}
err = rs.Close()
if err != nil {
return nil, err
}
sRows := make([][]string, len(rows))
for i := range rows {
row := rows[i]
iRow := make([]string, row.Len())
for j := range row.Len() {
if row.IsNull(j) {
iRow[j] = "<nil>"
} else {
d := row.GetDatum(j, &rs.Fields()[j].Column.FieldType)
iRow[j], err = d.ToString()
if err != nil {
return nil, err
}
}
}
sRows[i] = iRow
}
return sRows, nil
}
// Session errors.
var (
ErrForUpdateCantRetry = dbterror.ClassSession.NewStd(errno.ErrForUpdateCantRetry)
)