Files
tidb/pkg/server/conn_stmt.go

693 lines
23 KiB
Go

// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Copyright 2013 The Go-MySQL-Driver Authors. All rights reserved.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
// The MIT License (MIT)
//
// Copyright (c) 2014 wandoulabs
// Copyright (c) 2014 siddontang
//
// Permission is hereby granted, free of charge, to any person obtaining a copy of
// this software and associated documentation files (the "Software"), to deal in
// the Software without restriction, including without limitation the rights to
// use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software is furnished to do so,
// subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
package server
import (
"context"
"encoding/binary"
"runtime/trace"
"strconv"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/param"
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/charset"
"github.com/pingcap/tidb/pkg/parser/mysql"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/plugin"
"github.com/pingcap/tidb/pkg/server/internal/dump"
"github.com/pingcap/tidb/pkg/server/internal/parse"
"github.com/pingcap/tidb/pkg/server/internal/resultset"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/sessiontxn"
storeerr "github.com/pingcap/tidb/pkg/store/driver/error"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/execdetails"
"github.com/pingcap/tidb/pkg/util/logutil"
"github.com/pingcap/tidb/pkg/util/memory"
"github.com/pingcap/tidb/pkg/util/redact"
"github.com/pingcap/tidb/pkg/util/topsql"
topsqlstate "github.com/pingcap/tidb/pkg/util/topsql/state"
"go.uber.org/zap"
)
func (cc *clientConn) HandleStmtPrepare(ctx context.Context, sql string) error {
stmt, columns, params, err := cc.ctx.Prepare(sql)
if err != nil {
return err
}
data := make([]byte, 4, 128)
// status ok
data = append(data, 0)
// stmt id
data = dump.Uint32(data, uint32(stmt.ID()))
// number columns
data = dump.Uint16(data, uint16(len(columns)))
// number params
data = dump.Uint16(data, uint16(len(params)))
// filter [00]
data = append(data, 0)
// warning count
data = append(data, 0, 0) // TODO support warning count
if err := cc.writePacket(data); err != nil {
return err
}
cc.initResultEncoder(ctx)
defer cc.rsEncoder.Clean()
if len(params) > 0 {
for i := range params {
data = data[0:4]
data = params[i].Dump(data, cc.rsEncoder)
if err := cc.writePacket(data); err != nil {
return err
}
}
if cc.capability&mysql.ClientDeprecateEOF == 0 {
// metadata only needs EOF marker for old clients without ClientDeprecateEOF
if err := cc.writeEOF(ctx, cc.ctx.Status()); err != nil {
return err
}
}
}
if len(columns) > 0 {
for i := range columns {
data = data[0:4]
data = columns[i].Dump(data, cc.rsEncoder)
if err := cc.writePacket(data); err != nil {
return err
}
}
if cc.capability&mysql.ClientDeprecateEOF == 0 {
// metadata only needs EOF marker for old clients without ClientDeprecateEOF
if err := cc.writeEOF(ctx, cc.ctx.Status()); err != nil {
return err
}
}
}
return cc.flush(ctx)
}
func (cc *clientConn) handleStmtExecute(ctx context.Context, data []byte) (err error) {
defer trace.StartRegion(ctx, "HandleStmtExecute").End()
if len(data) < 9 {
return mysql.ErrMalformPacket
}
pos := 0
stmtID := binary.LittleEndian.Uint32(data[0:4])
pos += 4
stmt := cc.ctx.GetStatement(int(stmtID))
if stmt == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_execute")
}
flag := data[pos]
pos++
// Please refer to https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_stmt_execute.html
// The client indicates that it wants to use cursor by setting this flag.
// Now we only support forward-only, read-only cursor.
useCursor := false
if flag&mysql.CursorTypeReadOnly > 0 {
useCursor = true
}
if flag&mysql.CursorTypeForUpdate > 0 {
return mysql.NewErrf(mysql.ErrUnknown, "unsupported flag: CursorTypeForUpdate", nil)
}
if flag&mysql.CursorTypeScrollable > 0 {
return mysql.NewErrf(mysql.ErrUnknown, "unsupported flag: CursorTypeScrollable", nil)
}
if useCursor {
cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
defer cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, false)
} else {
// not using streaming ,can reuse chunk
cc.ctx.GetSessionVars().SetAlloc(cc.chunkAlloc)
}
// skip iteration-count, always 1
pos += 4
var (
nullBitmaps []byte
paramTypes []byte
paramValues []byte
)
cc.initInputEncoder(ctx)
numParams := stmt.NumParams()
args := make([]param.BinaryParam, numParams)
if numParams > 0 {
nullBitmapLen := (numParams + 7) >> 3
if len(data) < (pos + nullBitmapLen + 1) {
return mysql.ErrMalformPacket
}
nullBitmaps = data[pos : pos+nullBitmapLen]
pos += nullBitmapLen
// new param bound flag
if data[pos] == 1 {
pos++
if len(data) < (pos + (numParams << 1)) {
return mysql.ErrMalformPacket
}
paramTypes = data[pos : pos+(numParams<<1)]
pos += numParams << 1
paramValues = data[pos:]
// Just the first StmtExecute packet contain parameters type,
// we need save it for further use.
stmt.SetParamsType(paramTypes)
} else {
paramValues = data[pos+1:]
}
err = parseBinaryParams(args, stmt.BoundParams(), nullBitmaps, stmt.GetParamsType(), paramValues, cc.inputDecoder)
// This `.Reset` resets the arguments, so it's fine to just ignore the error (and the it'll be reset again in the following routine)
errReset := stmt.Reset()
if errReset != nil {
logutil.Logger(ctx).Warn("fail to reset statement in EXECUTE command", zap.Error(errReset))
}
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
}
sessVars := cc.ctx.GetSessionVars()
// expiredTaskID is the task ID of the previous statement. When executing a stmt,
// the StmtCtx will be reinit and the TaskID will change. We can compare the StmtCtx.TaskID
// with the previous one to determine whether StmtCtx has been inited for the current stmt.
expiredTaskID := sessVars.StmtCtx.TaskID
err = cc.executePlanCacheStmt(ctx, stmt, args, useCursor)
cc.onExtensionBinaryExecuteEnd(stmt, args, sessVars.StmtCtx.TaskID != expiredTaskID, err)
return err
}
func (cc *clientConn) executePlanCacheStmt(ctx context.Context, stmt any, args []param.BinaryParam, useCursor bool) (err error) {
ctx = execdetails.ContextWithInitializedExecDetails(ctx)
fn := func() bool {
if cc.bufReadConn != nil {
return cc.bufReadConn.IsAlive() != 0
}
return true
}
cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(&fn)
defer cc.ctx.GetSessionVars().SQLKiller.IsConnectionAlive.Store(nil)
//nolint:forcetypeassert
retryable, err := cc.executePreparedStmtAndWriteResult(ctx, stmt.(PreparedStatement), args, useCursor)
if err != nil {
action, txnErr := sessiontxn.GetTxnManager(&cc.ctx).OnStmtErrorForNextAction(ctx, sessiontxn.StmtErrAfterQuery, err)
if txnErr != nil {
return txnErr
}
if retryable && action == sessiontxn.StmtActionRetryReady {
cc.ctx.GetSessionVars().RetryInfo.Retrying = true
//nolint:forcetypeassert
_, err = cc.executePreparedStmtAndWriteResult(ctx, stmt.(PreparedStatement), args, useCursor)
cc.ctx.GetSessionVars().RetryInfo.Retrying = false
return err
}
}
_, allowTiFlashFallback := cc.ctx.GetSessionVars().AllowFallbackToTiKV[kv.TiFlash]
if allowTiFlashFallback && err != nil && errors.ErrorEqual(err, storeerr.ErrTiFlashServerTimeout) && retryable {
// When the TiFlash server seems down, we append a warning to remind the user to check the status of the TiFlash
// server and fallback to TiKV.
prevErr := err
delete(cc.ctx.GetSessionVars().IsolationReadEngines, kv.TiFlash)
defer func() {
cc.ctx.GetSessionVars().IsolationReadEngines[kv.TiFlash] = struct{}{}
}()
//nolint:forcetypeassert
_, err = cc.executePreparedStmtAndWriteResult(ctx, stmt.(PreparedStatement), args, useCursor)
// We append warning after the retry because `ResetContextOfStmt` may be called during the retry, which clears warnings.
cc.ctx.GetSessionVars().StmtCtx.AppendError(prevErr)
}
return err
}
// The first return value indicates whether the call of executePreparedStmtAndWriteResult has no side effect and can be retried.
// Currently the first return value is used to fallback to TiKV when TiFlash is down.
func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stmt PreparedStatement, args []param.BinaryParam, useCursor bool) (bool, error) {
vars := (&cc.ctx).GetSessionVars()
prepStmt, err := vars.GetPreparedStmtByID(uint32(stmt.ID()))
if err != nil {
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
execStmt := &ast.ExecuteStmt{
BinaryArgs: args,
PrepStmt: prepStmt,
PrepStmtId: uint32(stmt.ID()),
}
// first, try to clear the left cursor if there is one
if useCursor && stmt.GetCursorActive() {
if stmt.GetResultSet() != nil && stmt.GetResultSet().GetRowIterator() != nil {
stmt.GetResultSet().GetRowIterator().Close()
}
if stmt.GetRowContainer() != nil {
stmt.GetRowContainer().GetMemTracker().Detach()
stmt.GetRowContainer().GetDiskTracker().Detach()
err := stmt.GetRowContainer().Close()
if err != nil {
logutil.Logger(ctx).Error(
"Fail to close rowContainer before executing statement. May cause resource leak",
zap.Error(err))
}
stmt.StoreRowContainer(nil)
}
stmt.StoreResultSet(nil)
stmt.SetCursorActive(false)
}
// For the combination of `ComPrepare` and `ComExecute`, the statement name is stored in the client side, and the
// TiDB only has the ID, so don't try to construct an `EXECUTE SOMETHING`. Use the original prepared statement here
// instead.
sql := ""
planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt)
if ok {
sql = planCacheStmt.StmtText
}
execStmt.SetText(charset.EncodingUTF8Impl, sql)
rs, err := (&cc.ctx).ExecuteStmt(ctx, execStmt)
var lazy bool
if rs != nil {
defer func() {
if !lazy {
rs.Close()
}
}()
}
if err != nil {
// If error is returned during the planner phase or the executor.Open
// phase, the rs will be nil, and StmtCtx.MemTracker StmtCtx.DiskTracker
// will not be detached. We need to detach them manually.
if sv := cc.ctx.GetSessionVars(); sv != nil && sv.StmtCtx != nil {
sv.StmtCtx.DetachMemDiskTracker()
}
return true, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
if rs == nil {
if useCursor {
vars.SetStatusFlag(mysql.ServerStatusCursorExists, false)
}
return false, cc.writeOK(ctx)
}
if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok {
rs.SetPreparedStmt(planCacheStmt)
}
// if the client wants to use cursor
// we should hold the ResultSet in PreparedStatement for next stmt_fetch, and only send back ColumnInfo.
// Tell the client cursor exists in server by setting proper serverStatus.
if useCursor {
lazy, err = cc.executeWithCursor(ctx, stmt, rs)
return false, err
}
retryable, err := cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), 0)
if err != nil {
return retryable, errors.Annotate(err, cc.preparedStmt2String(uint32(stmt.ID())))
}
return false, nil
}
func (cc *clientConn) executeWithCursor(ctx context.Context, stmt PreparedStatement, rs resultset.ResultSet) (lazy bool, err error) {
vars := (&cc.ctx).GetSessionVars()
if vars.EnableLazyCursorFetch {
// try to execute with lazy cursor fetch
ok, err := cc.executeWithLazyCursor(ctx, stmt, rs)
// if `ok` is false, should try to execute without lazy cursor fetch
if ok {
return true, err
}
}
failpoint.Inject("avoidEagerCursorFetch", func() {
failpoint.Return(false, errors.New("failpoint avoids eager cursor fetch"))
})
cc.initResultEncoder(ctx)
defer cc.rsEncoder.Clean()
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
// the rows directly to avoid running executor and accessing shared params/variables in the session
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
// but the rows are still needed in the following FETCH command.
// create the row container to manage spill
// this `rowContainer` will be released when the statement (or the connection) is closed.
rowContainer := chunk.NewRowContainer(rs.FieldTypes(), vars.MaxChunkSize)
rowContainer.GetMemTracker().AttachTo(vars.MemTracker)
rowContainer.GetMemTracker().SetLabel(memory.LabelForCursorFetch)
rowContainer.GetDiskTracker().AttachTo(vars.DiskTracker)
rowContainer.GetDiskTracker().SetLabel(memory.LabelForCursorFetch)
if vardef.EnableTmpStorageOnOOM.Load() {
failpoint.Inject("testCursorFetchSpill", func(val failpoint.Value) {
if val, ok := val.(bool); val && ok {
actionSpill := rowContainer.ActionSpillForTest()
defer actionSpill.WaitForTest()
}
})
action := memory.NewActionWithPriority(rowContainer.ActionSpill(), memory.DefCursorFetchSpillPriority)
vars.MemTracker.FallbackOldAndSetNewAction(action)
}
// store the rowContainer in the statement right after it's created, so that even if the logic in defer is not triggered,
// the rowContainer will be released when the statement is closed.
stmt.StoreRowContainer(rowContainer)
defer func() {
if err != nil {
// if the execution panic, it'll not reach this branch. The `rowContainer` will be released in the `stmt.Close`.
stmt.StoreRowContainer(nil)
rowContainer.GetMemTracker().Detach()
rowContainer.GetDiskTracker().Detach()
errCloseRowContainer := rowContainer.Close()
if errCloseRowContainer != nil {
logutil.Logger(ctx).Error("Fail to close rowContainer in error handler. May cause resource leak",
zap.NamedError("original-error", err), zap.NamedError("close-error", errCloseRowContainer))
}
}
}()
for {
chk := rs.NewChunk(nil)
if err = rs.Next(ctx, chk); err != nil {
return false, err
}
rowCount := chk.NumRows()
if rowCount == 0 {
break
}
err = rowContainer.Add(chk)
if err != nil {
return false, err
}
}
reader := chunk.NewRowContainerReader(rowContainer)
defer func() {
if err != nil {
reader.Close()
}
}()
crs := resultset.WrapWithRowContainerCursor(rs, reader)
if cl, ok := crs.(resultset.FetchNotifier); ok {
cl.OnFetchReturned()
}
err = cc.writeExecuteResultWithCursor(ctx, stmt, crs)
return false, err
}
// executeWithLazyCursor tries to detach the `ResultSet` and make it suitable to execute lazily.
// Be careful that the return value `(bool, error)` has different meaning with other similar functions. The first `bool` represent whether
// the `ResultSet` is suitable for lazy execution. If the return value is `(false, _)`, the `rs` in argument can still be used. If the
// first return value is `true` and `err` is not nil, the `rs` cannot be used anymore and should return the error to the upper layer.
func (cc *clientConn) executeWithLazyCursor(ctx context.Context, stmt PreparedStatement, rs resultset.ResultSet) (ok bool, err error) {
drs, ok, err := rs.TryDetach()
if !ok || err != nil {
return false, err
}
vars := (&cc.ctx).GetSessionVars()
crs := resultset.WrapWithLazyCursor(drs, vars.InitChunkSize, vars.MaxChunkSize)
err = cc.writeExecuteResultWithCursor(ctx, stmt, crs)
return true, err
}
// writeExecuteResultWithCursor will store the `ResultSet` in `stmt` and send the column info to the client. The logic is shared between
// lazy cursor fetch and normal(eager) cursor fetch.
func (cc *clientConn) writeExecuteResultWithCursor(ctx context.Context, stmt PreparedStatement, rs resultset.CursorResultSet) (err error) {
stmt.StoreResultSet(rs)
stmt.SetCursorActive(true)
defer func() {
if err != nil {
// the resultSet and rowContainer have been closed in former "defer" statement.
stmt.StoreResultSet(nil)
stmt.SetCursorActive(false)
}
}()
if err = cc.writeColumnInfo(rs.Columns()); err != nil {
return err
}
// explicitly flush columnInfo to client.
err = cc.writeEOF(ctx, cc.ctx.Status())
if err != nil {
return err
}
return cc.flush(ctx)
}
func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err error) {
cc.ctx.GetSessionVars().StartTime = time.Now()
cc.ctx.GetSessionVars().ClearAlloc(nil, false)
cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, true)
defer cc.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusCursorExists, false)
// Reset the warn count. TODO: consider whether it's better to reset the whole session context/statement context.
if cc.ctx.GetSessionVars().StmtCtx != nil {
cc.ctx.GetSessionVars().StmtCtx.SetWarnings(nil)
}
cc.ctx.GetSessionVars().SysErrorCount = 0
cc.ctx.GetSessionVars().SysWarningCount = 0
stmtID, fetchSize, err := parse.StmtFetchCmd(data)
if err != nil {
return err
}
stmt := cc.ctx.GetStatement(int(stmtID))
if stmt == nil {
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID))
}
if !stmt.GetCursorActive() {
return errors.Annotate(mysql.NewErr(mysql.ErrSpCursorNotOpen), cc.preparedStmt2String(stmtID))
}
// from now on, we have made sure: the statement has an active cursor
// then if facing any error, this cursor should be reset
defer func() {
if err != nil {
errReset := stmt.Reset()
if errReset != nil {
logutil.Logger(ctx).Error("Fail to reset statement in error handler. May cause resource leak.",
zap.NamedError("original-error", err), zap.NamedError("reset-error", errReset))
}
}
}()
if topsqlstate.TopSQLEnabled() {
prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID)
if prepareObj != nil && prepareObj.SQLDigest != nil {
ctx = topsql.AttachAndRegisterSQLInfo(ctx, prepareObj.NormalizedSQL, prepareObj.SQLDigest, false)
}
}
sql := ""
if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok {
sql = prepared.sql
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0)
rs := stmt.GetResultSet()
_, err = cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), int(fetchSize))
// if the iterator reached the end before writing result, we could say the `FETCH` command will send EOF
if rs.GetRowIterator().Current(ctx) == rs.GetRowIterator().End() {
// also reset the statement when the cursor reaches the end
// don't overwrite the `err` in outer scope, to avoid redundant `Reset()` in `defer` statement (though, it's not
// a big problem, as the `Reset()` function call is idempotent.)
err := stmt.Reset()
if err != nil {
logutil.Logger(ctx).Error("Fail to reset statement when FETCH command reaches the end. May cause resource leak",
zap.NamedError("error", err))
}
}
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
return nil
}
func (cc *clientConn) handleStmtClose(data []byte) (err error) {
if len(data) < 4 {
return
}
stmtID := int(binary.LittleEndian.Uint32(data[0:4]))
stmt := cc.ctx.GetStatement(stmtID)
if stmt != nil {
err = stmt.Close()
ctx := context.WithValue(context.Background(), plugin.PrepareStmtIDCtxKey, uint32(stmtID))
cc.audit(ctx, plugin.Completed)
return err
}
return
}
func (cc *clientConn) handleStmtSendLongData(data []byte) (err error) {
if len(data) < 6 {
return mysql.ErrMalformPacket
}
stmtID := int(binary.LittleEndian.Uint32(data[0:4]))
stmt := cc.ctx.GetStatement(stmtID)
if stmt == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.Itoa(stmtID), "stmt_send_longdata")
}
paramID := int(binary.LittleEndian.Uint16(data[4:6]))
return stmt.AppendParam(paramID, data[6:])
}
func (cc *clientConn) handleStmtReset(ctx context.Context, data []byte) (err error) {
// A reset command should reset the statement to the state when it was right after prepare
// Then the following state should be cleared:
// 1.The opened cursor, including the rowContainer (and its cursor/memTracker).
// 2.The argument sent through `SEND_LONG_DATA`.
if len(data) < 4 {
return mysql.ErrMalformPacket
}
stmtID := int(binary.LittleEndian.Uint32(data[0:4]))
stmt := cc.ctx.GetStatement(stmtID)
if stmt == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.Itoa(stmtID), "stmt_reset")
}
err = stmt.Reset()
if err != nil {
// Both server and client cannot handle the error case well, so just left an error and return OK.
// It's fine to receive further `EXECUTE` command even the `Reset` function call failed.
logutil.Logger(ctx).Error("Fail to close statement in error handler of RESET command. May cause resource leak",
zap.NamedError("original-error", err), zap.NamedError("close-error", err))
return cc.writeOK(ctx)
}
return cc.writeOK(ctx)
}
// handleSetOption refer to https://dev.mysql.com/doc/internals/en/com-set-option.html
func (cc *clientConn) handleSetOption(ctx context.Context, data []byte) (err error) {
if len(data) < 2 {
return mysql.ErrMalformPacket
}
switch binary.LittleEndian.Uint16(data[:2]) {
case 0:
cc.capability |= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(cc.capability)
case 1:
cc.capability &^= mysql.ClientMultiStatements
cc.ctx.SetClientCapability(cc.capability)
default:
return mysql.ErrMalformPacket
}
if err = cc.writeEOF(ctx, cc.ctx.Status()); err != nil {
return err
}
return cc.flush(ctx)
}
func (cc *clientConn) preparedStmt2String(stmtID uint32) string {
sv := cc.ctx.GetSessionVars()
if sv == nil {
return ""
}
sql := parser.Normalize(cc.preparedStmt2StringNoArgs(stmtID), sv.EnableRedactLog)
if m := sv.EnableRedactLog; m != errors.RedactLogEnable {
sql += redact.String(sv.EnableRedactLog, sv.PlanCacheParams.String())
}
return sql
}
func (cc *clientConn) preparedStmt2StringNoArgs(stmtID uint32) string {
sv := cc.ctx.GetSessionVars()
if sv == nil {
return ""
}
preparedObj, invalid := cc.preparedStmtID2CachePreparedStmt(stmtID)
if invalid {
return "invalidate PlanCacheStmt type, ID: " + strconv.FormatUint(uint64(stmtID), 10)
}
if preparedObj == nil {
return "prepared statement not found, ID: " + strconv.FormatUint(uint64(stmtID), 10)
}
return preparedObj.PreparedAst.Stmt.Text()
}
func (cc *clientConn) preparedStmtID2CachePreparedStmt(stmtID uint32) (_ *plannercore.PlanCacheStmt, invalid bool) {
sv := cc.ctx.GetSessionVars()
if sv == nil {
return nil, false
}
preparedPointer, ok := sv.PreparedStmts[stmtID]
if !ok {
// not found
return nil, false
}
preparedObj, ok := preparedPointer.(*plannercore.PlanCacheStmt)
if !ok {
// invalid cache. should never happen.
return nil, true
}
return preparedObj, false
}