session, com_stmt: fetch all rows during EXECUTE command (#42473)

close pingcap/tidb#41891, close pingcap/tidb#42424
This commit is contained in:
YangKeao
2023-03-23 05:34:43 -04:00
committed by GitHub
parent cc02f50146
commit 2e8a982cb0
18 changed files with 113 additions and 539 deletions

View File

@ -1340,11 +1340,13 @@ func TestLogAndShowSlowLog(t *testing.T) {
}
func TestReportingMinStartTimestamp(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
tk := testkit.NewTestKit(t, store)
se := tk.Session()
_, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, dbTestLease)
infoSyncer := dom.InfoSyncer()
sm := &testkit.MockSessionManager{
PS: make([]*util.ProcessInfo, 0),
}
infoSyncer.SetSessionManager(sm)
beforeTS := oracle.GoTimeToTS(time.Now())
infoSyncer.ReportMinStartTS(dom.Store())
afterTS := oracle.GoTimeToTS(time.Now())
@ -1353,21 +1355,13 @@ func TestReportingMinStartTimestamp(t *testing.T) {
now := time.Now()
validTS := oracle.GoTimeToLowerLimitStartTS(now.Add(time.Minute), tikv.MaxTxnTimeUse)
lowerLimit := oracle.GoTimeToLowerLimitStartTS(now, tikv.MaxTxnTimeUse)
sm := se.GetSessionManager().(*testkit.MockSessionManager)
sm.PS = []*util.ProcessInfo{
{CurTxnStartTS: 0, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: math.MaxUint64, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: lowerLimit, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: validTS, ProtectedTSList: &se.GetSessionVars().ProtectedTSList},
{CurTxnStartTS: 0},
{CurTxnStartTS: math.MaxUint64},
{CurTxnStartTS: lowerLimit},
{CurTxnStartTS: validTS},
}
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())
unhold := se.GetSessionVars().ProtectedTSList.HoldTS(validTS - 1)
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS-1, infoSyncer.GetMinStartTS())
unhold()
infoSyncer.SetSessionManager(sm)
infoSyncer.ReportMinStartTS(dom.Store())
require.Equal(t, validTS, infoSyncer.GetMinStartTS())
}

View File

@ -797,6 +797,8 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
if sm == nil {
return
}
pl := sm.ShowProcessList()
innerSessionStartTSList := sm.GetInternalSessionStartTSList()
// Calculate the lower limit of the start timestamp to avoid extremely old transaction delaying GC.
currentVer, err := store.CurrentVersion(kv.GlobalTxnScope)
@ -810,8 +812,18 @@ func (is *InfoSyncer) ReportMinStartTS(store kv.Storage) {
minStartTS := oracle.GoTimeToTS(now)
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("initial minStartTS", minStartTS),
zap.Uint64("StartTSLowerLimit", startTSLowerLimit))
if ts := sm.GetMinStartTS(startTSLowerLimit); ts > startTSLowerLimit && ts < minStartTS {
minStartTS = ts
for _, info := range pl {
if info.CurTxnStartTS > startTSLowerLimit && info.CurTxnStartTS < minStartTS {
minStartTS = info.CurTxnStartTS
}
}
for _, innerTS := range innerSessionStartTSList {
logutil.BgLogger().Debug("ReportMinStartTS", zap.Uint64("Internal Session Transaction StartTS", innerTS))
kv.PrintLongTimeInternalTxn(now, innerTS, false)
if innerTS > startTSLowerLimit && innerTS < minStartTS {
minStartTS = innerTS
}
}
is.minStartTS = kv.GetMinInnerTxnStartTS(now, startTSLowerLimit, minStartTS)

View File

@ -204,7 +204,6 @@ go_test(
"//util/plancodec",
"//util/resourcegrouptag",
"//util/rowcodec",
"//util/sqlexec",
"//util/stmtsummary/v2:stmtsummary",
"//util/syncutil",
"//util/topsql",

View File

@ -2331,34 +2331,12 @@ func (cc *clientConn) writeChunks(ctx context.Context, rs ResultSet, binary bool
// fetchSize, the desired number of rows to be fetched each time when client uses cursor.
func (cc *clientConn) writeChunksWithFetchSize(ctx context.Context, rs ResultSet, serverStatus uint16, fetchSize int) error {
fetchedRows := rs.GetFetchedRows()
// if fetchedRows is not enough, getting data from recordSet.
// NOTE: chunk should not be allocated from the allocator
// the allocator will reset every statement
// but it maybe stored in the result set among statements
// ref https://github.com/pingcap/tidb/blob/7fc6ebbda4ddf84c0ba801ca7ebb636b934168cf/server/conn_stmt.go#L233-L239
// Here server.tidbResultSet implements Next method.
req := rs.NewChunk(nil)
for len(fetchedRows) < fetchSize {
if err := rs.Next(ctx, req); err != nil {
return err
}
rowCount := req.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
fetchedRows = append(fetchedRows, req.GetRow(i))
}
req = chunk.Renew(req, cc.ctx.GetSessionVars().MaxChunkSize)
}
// tell the client COM_STMT_FETCH has finished by setting proper serverStatus,
// and close ResultSet.
if len(fetchedRows) == 0 {
serverStatus &^= mysql.ServerStatusCursorExists
serverStatus |= mysql.ServerStatusLastRowSend
terror.Call(rs.Close)
return cc.writeEOF(ctx, serverStatus)
}

View File

@ -56,6 +56,7 @@ import (
"github.com/pingcap/tidb/sessiontxn"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/topsql"
@ -285,7 +286,7 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
}
// since there are multiple implementations of ResultSet (the rs might be wrapped), we have to unwrap the rs before
// casting it to *tidbResultSet.
if result, ok := unwrapResultSet(rs).(*tidbResultSet); ok {
if result, ok := rs.(*tidbResultSet); ok {
if planCacheStmt, ok := prepStmt.(*plannercore.PlanCacheStmt); ok {
result.preparedStmt = planCacheStmt
}
@ -297,29 +298,54 @@ func (cc *clientConn) executePreparedStmtAndWriteResult(ctx context.Context, stm
if useCursor {
cc.initResultEncoder(ctx)
defer cc.rsEncoder.clean()
// fix https://github.com/pingcap/tidb/issues/39447. we need to hold the start-ts here because the process info
// will be set to sleep after fetch returned.
if pi := cc.ctx.ShowProcess(); pi != nil && pi.ProtectedTSList != nil && pi.CurTxnStartTS > 0 {
unhold := pi.HoldTS(pi.CurTxnStartTS)
rs = &rsWithHooks{ResultSet: rs, onClosed: unhold}
// fetch all results of the resultSet, and stored them locally, so that the future `FETCH` command can read
// the rows directly to avoid running executor and accessing shared params/variables in the session
// NOTE: chunk should not be allocated from the connection allocator, which will reset after executing this command
// but the rows are still needed in the following FETCH command.
//
// TODO: trace the memory used here
chk := rs.NewChunk(nil)
var rows []chunk.Row
for {
if err = rs.Next(ctx, chk); err != nil {
return false, err
}
rowCount := chk.NumRows()
if rowCount == 0 {
break
}
// filling fetchedRows with chunk
for i := 0; i < rowCount; i++ {
row := chk.GetRow(i)
rows = append(rows, row)
}
chk = chunk.Renew(chk, vars.MaxChunkSize)
}
rs.StoreFetchedRows(rows)
stmt.StoreResultSet(rs)
// also store the preparedParams in the stmt, so we could restore the params in the following fetch command
// the params should have been parsed in `(&cc.ctx).ExecuteStmt(ctx, execStmt)`.
stmt.StorePreparedCtx(&PreparedStatementCtx{
Params: vars.PreparedParams,
})
if err = cc.writeColumnInfo(rs.Columns()); err != nil {
return false, err
}
if cl, ok := rs.(fetchNotifier); ok {
cl.OnFetchReturned()
}
// as the `Next` of `ResultSet` will never be called, all rows have been cached inside it. We could close this
// `ResultSet`.
err = rs.Close()
if err != nil {
return false, err
}
stmt.SetCursorActive(true)
// explicitly flush columnInfo to client.
err = cc.writeEOF(ctx, cc.ctx.Status())
if err != nil {
return false, err
}
return false, cc.flush(ctx)
}
defer terror.Call(rs.Close)
@ -351,9 +377,6 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
return errors.Annotate(mysql.NewErr(mysql.ErrUnknownStmtHandler,
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch"), cc.preparedStmt2String(stmtID))
}
cc.ctx.GetSessionVars().PreparedParams = stmt.GetPreparedCtx().Params
if topsqlstate.TopSQLEnabled() {
prepareObj, _ := cc.preparedStmtID2CachePreparedStmt(stmtID)
if prepareObj != nil && prepareObj.SQLDigest != nil {
@ -371,10 +394,19 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
strconv.FormatUint(uint64(stmtID), 10), "stmt_fetch_rs"), cc.preparedStmt2String(stmtID))
}
sendingEOF := false
// if the `fetchedRows` are empty before writing result, we could say the `FETCH` command will send EOF
if len(rs.GetFetchedRows()) == 0 {
sendingEOF = true
}
_, err = cc.writeResultSet(ctx, rs, true, cc.ctx.Status(), int(fetchSize))
if err != nil {
return errors.Annotate(err, cc.preparedStmt2String(stmtID))
}
if sendingEOF {
stmt.SetCursorActive(false)
}
return nil
}
@ -712,6 +744,7 @@ func (cc *clientConn) handleStmtClose(data []byte) (err error) {
if stmt != nil {
return stmt.Close()
}
return
}

View File

@ -20,7 +20,6 @@ import (
"encoding/binary"
"testing"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
@ -257,92 +256,6 @@ func TestParseStmtFetchCmd(t *testing.T) {
}
}
func TestCursorReadHoldTS(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
srv.SetDomain(dom)
defer srv.Close()
appendUint32 := binary.LittleEndian.AppendUint32
ctx := context.Background()
c := CreateMockConn(t, srv)
tk := testkit.NewTestKitWithSession(t, store, c.Context().Session)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key)")
tk.MustExec("insert into t values (1), (2), (3), (4), (5), (6), (7), (8)")
tk.MustQuery("select count(*) from t").Check(testkit.Rows("8"))
stmt, _, _, err := c.Context().Prepare("select * from t")
require.NoError(t, err)
require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0))
// should hold ts after executing stmt with cursor
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
ts := tk.Session().ShowProcess().GetMinStartTS(0)
require.Positive(t, ts)
// should unhold ts when result set exhausted
require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5)))
require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0))
require.Equal(t, ts, srv.GetMinStartTS(0))
require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5)))
require.Equal(t, ts, tk.Session().ShowProcess().GetMinStartTS(0))
require.Equal(t, ts, srv.GetMinStartTS(0))
require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt.ID())), 5)))
require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0))
// should hold ts after executing stmt with cursor
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0))
// should unhold ts when stmt reset
require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtReset}, uint32(stmt.ID()))))
require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0))
// should hold ts after executing stmt with cursor
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
require.Positive(t, tk.Session().ShowProcess().GetMinStartTS(0))
// should unhold ts when stmt closed
require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtClose}, uint32(stmt.ID()))))
require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0))
// create another 2 stmts and execute them
stmt1, _, _, err := c.Context().Prepare("select * from t")
require.NoError(t, err)
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt1.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
ts1 := tk.Session().ShowProcess().GetMinStartTS(0)
require.Positive(t, ts1)
stmt2, _, _, err := c.Context().Prepare("select * from t")
require.NoError(t, err)
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, uint32(stmt2.ID())),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
ts2 := tk.Session().ShowProcess().GetMinStartTS(ts1)
require.Positive(t, ts2)
require.Less(t, ts1, ts2)
require.Equal(t, ts1, srv.GetMinStartTS(0))
require.Equal(t, ts2, srv.GetMinStartTS(ts1))
require.Zero(t, srv.GetMinStartTS(ts2))
// should unhold all when session closed
c.Close()
require.Zero(t, tk.Session().ShowProcess().GetMinStartTS(0))
require.Zero(t, srv.GetMinStartTS(0))
}
func TestCursorExistsFlag(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
@ -399,93 +312,6 @@ func TestCursorExistsFlag(t *testing.T) {
require.False(t, mysql.HasCursorExistsFlag(getLastStatus()))
}
func TestCursorReadWithRCCheckTS(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
srv.SetDomain(dom)
defer srv.Close()
appendUint32 := binary.LittleEndian.AppendUint32
ctx := context.Background()
c := CreateMockConn(t, srv).(*mockConn)
out := new(bytes.Buffer)
c.pkt.bufWriter.Reset(out)
c.capability |= mysql.ClientDeprecateEOF | mysql.ClientProtocol41
tk := testkit.NewTestKitWithSession(t, store, c.Context().Session)
c.Context().GetSessionVars().SetDistSQLScanConcurrency(1)
c.Context().GetSessionVars().InitChunkSize = 1
c.Context().GetSessionVars().MaxChunkSize = 1
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int, c int, key k1(b))")
// Prepare data.
tk.MustQuery("split table t by (100), (1000)")
tk.MustExec("set tidb_txn_mode='pessimistic'")
tk.MustExec("set tx_isolation='READ-COMMITTED'")
tk.MustExec("set tidb_rc_read_check_ts='on'")
tk.MustExec("set tidb_store_batch_size=0")
tk.MustExec("insert into t values (1, 1, 1), (2, 2, 2), (3, 3, 3), " +
"(104, 104, 104), (105, 105, 105), (106, 106, 106), " +
"(1007, 1007, 1007), (1008, 1008, 1008), (1009, 1009, 1009)")
tk.MustQuery("select count(*) from t").Check(testkit.Rows("9"))
getLastStatus := func() uint16 {
raw := out.Bytes()
return binary.LittleEndian.Uint16(raw[len(raw)-4 : len(raw)-2])
}
stmtCop, _, _, err := c.Context().Prepare("select * from t")
require.NoError(t, err)
stmtBatchGet, _, _, err := c.Context().Prepare("select * from t where a in (1, 2, 3, 104, 105, 106, 1007, 1008, 1009)")
require.NoError(t, err)
pauseCopIterTaskSender := "github.com/pingcap/tidb/store/copr/pauseCopIterTaskSender"
defer func() {
require.NoError(t, failpoint.Disable(pauseCopIterTaskSender))
}()
c.Context().GetSessionVars().MaxChunkSize = 1
for _, taskType := range []string{"BatchPointGet", "Cop"} {
tk.MustExec("begin pessimistic")
require.NoError(t, failpoint.Enable(pauseCopIterTaskSender, "pause"))
var stmtID uint32
if taskType == "BatchPointGet" {
stmtID = uint32(stmtBatchGet.ID())
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, stmtID),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
} else if taskType == "Cop" {
stmtID = uint32(stmtCop.ID())
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtExecute}, stmtID),
mysql.CursorTypeReadOnly, 0x1, 0x0, 0x0, 0x0,
)))
}
require.True(t, mysql.HasCursorExistsFlag(getLastStatus()))
// Fetch the first 3 rows.
require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, stmtID), 3)))
require.True(t, mysql.HasCursorExistsFlag(getLastStatus()))
// The rows are updated by other transactions before the next fetch.
connUpdate := CreateMockConn(t, srv).(*mockConn)
tkUpdate := testkit.NewTestKitWithSession(t, store, connUpdate.Context().Session)
tkUpdate.MustExec("use test")
tkUpdate.MustExec("update t set c = c + 10 where a in (1, 104, 1007)")
// Fetch the next 3 rows, write conflict error should be reported if the `RCCheckTS` is used.
require.NoError(t, failpoint.Disable(pauseCopIterTaskSender))
require.NoError(t, c.Dispatch(ctx, appendUint32(appendUint32([]byte{mysql.ComStmtFetch}, stmtID), 3)))
require.True(t, mysql.HasCursorExistsFlag(getLastStatus()))
// Finish.
require.NoError(t, c.Dispatch(ctx, appendUint32([]byte{mysql.ComStmtReset}, stmtID)))
tk.MustExec("rollback")
}
}
func TestCursorWithParams(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
srv := CreateMockServer(t, store)
@ -514,6 +340,10 @@ func TestCursorWithParams(t *testing.T) {
0x0, 0x1, 0x3, 0x0, 0x3, 0x0,
0x1, 0x0, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0,
)))
rows := c.Context().stmts[stmt1.ID()].GetResultSet().GetFetchedRows()
require.Len(t, rows, 1)
require.Equal(t, int64(1), rows[0].GetInt64(0))
require.Equal(t, int64(2), rows[0].GetInt64(1))
// `execute stmt2 using 1` with cursor
require.NoError(t, c.Dispatch(ctx, append(
@ -522,6 +352,12 @@ func TestCursorWithParams(t *testing.T) {
0x0, 0x1, 0x3, 0x0,
0x1, 0x0, 0x0, 0x0,
)))
rows = c.Context().stmts[stmt2.ID()].GetResultSet().GetFetchedRows()
require.Len(t, rows, 2)
require.Equal(t, int64(1), rows[0].GetInt64(0))
require.Equal(t, int64(1), rows[0].GetInt64(1))
require.Equal(t, int64(1), rows[1].GetInt64(0))
require.Equal(t, int64(2), rows[1].GetInt64(1))
// fetch stmt2 with fetch size 256
require.NoError(t, c.Dispatch(ctx, append(
@ -529,7 +365,8 @@ func TestCursorWithParams(t *testing.T) {
0x0, 0x1, 0x0, 0x0,
)))
// fetch stmt1 with fetch size 256, as it has more params, if we didn't restore the parameters, it will panic.
// fetch stmt1 with fetch size 256, as it has more params, if we fetch the result at the first execute command, it
// will panic because the params have been overwritten and is not long enough.
require.NoError(t, c.Dispatch(ctx, append(
appendUint32([]byte{mysql.ComStmtFetch}, uint32(stmt1.ID())),
0x0, 0x1, 0x0, 0x0,

View File

@ -524,7 +524,7 @@ func TestDispatchClientProtocol41(t *testing.T) {
com: mysql.ComStmtFetch,
in: []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
err: nil,
out: []byte{0x5, 0x0, 0x0, 0x9, 0xfe, 0x0, 0x0, 0x82, 0x0},
out: []byte{0x5, 0x0, 0x0, 0x9, 0xfe, 0x0, 0x0, 0x42, 0x0},
},
{
com: mysql.ComStmtReset,
@ -1022,8 +1022,7 @@ func TestTiFlashFallback(t *testing.T) {
tk.MustQuery("show warnings").Check(testkit.Rows("Error 9012 TiFlash server timeout"))
// test COM_STMT_FETCH (cursor mode)
require.NoError(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}))
require.Error(t, cc.handleStmtFetch(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
require.Error(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x1, 0x1, 0x0, 0x0, 0x0}))
tk.MustExec("set @@tidb_allow_fallback_to_tikv=''")
require.Error(t, cc.handleStmtExecute(ctx, []byte{0x1, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0x0}))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/BatchCopRpcErrtiflash0"))

View File

@ -20,7 +20,6 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/extension"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/chunk"
)
@ -30,17 +29,6 @@ type IDriver interface {
OpenCtx(connID uint64, capability uint32, collation uint8, dbname string, tlsState *tls.ConnectionState, extensions *extension.SessionExtensions) (*TiDBContext, error)
}
// PreparedStatementCtx stores the context generated in `execute` statement for a prepared statement
// subsequent stmt fetching could restore the session variables from this context
type PreparedStatementCtx struct {
// Params is the params used in `execute` statement
Params variable.PreparedParams
// TODO: store and restore variables, but be careful that we'll also need to restore the variables after FETCH
// a cleaner way to solve this problem is to always reading params from a statement scope (but not session scope)
// context. But switching in/out related context is simpler on current code base, and the affected radius is more
// controllable.
}
// PreparedStatement is the interface to use a prepared statement.
type PreparedStatement interface {
// ID returns statement ID
@ -70,17 +58,17 @@ type PreparedStatement interface {
// GetResultSet gets ResultSet associated this statement
GetResultSet() ResultSet
// StorePreparedCtx stores context in `execute` statement for subsequent stmt fetching
StorePreparedCtx(ctx *PreparedStatementCtx)
// GetPreparedParams gets the prepared params associated this statement
GetPreparedCtx() *PreparedStatementCtx
// Reset removes all bound parameters.
Reset()
// Close closes the statement.
Close() error
// GetCursorActive returns whether the statement has active cursor
GetCursorActive() bool
// SetCursorActive sets whether the statement has active cursor
SetCursorActive(active bool)
}
// ResultSet is the result set of an query.

View File

@ -66,10 +66,13 @@ type TiDBStatement struct {
boundParams [][]byte
paramsType []byte
ctx *TiDBContext
rs ResultSet
sql string
// this result set should have been closed before stored here. Only the `fetchedRows` are used here. This field is
// not moved out to reuse the logic inside functions `writeResultSet...`
// TODO: move the `fetchedRows` into the statement, and remove the `ResultSet` from statement.
rs ResultSet
sql string
preparedStatementCtx *PreparedStatementCtx
hasActiveCursor bool
}
// ID implements PreparedStatement ID method.
@ -144,27 +147,12 @@ func (ts *TiDBStatement) GetResultSet() ResultSet {
return ts.rs
}
// StorePreparedCtx implements PreparedStatement StorePreparedCtx method.
func (ts *TiDBStatement) StorePreparedCtx(ctx *PreparedStatementCtx) {
ts.preparedStatementCtx = ctx
}
// GetPreparedCtx implements PreparedStatement GetPreparedCtx method.
func (ts *TiDBStatement) GetPreparedCtx() *PreparedStatementCtx {
return ts.preparedStatementCtx
}
// Reset implements PreparedStatement Reset method.
func (ts *TiDBStatement) Reset() {
for i := range ts.boundParams {
ts.boundParams[i] = nil
}
// closing previous ResultSet if it exists
if ts.rs != nil {
terror.Call(ts.rs.Close)
ts.rs = nil
}
ts.hasActiveCursor = false
}
// Close implements PreparedStatement Close method.
@ -195,14 +183,19 @@ func (ts *TiDBStatement) Close() error {
ts.ctx.GetSessionVars().RemovePreparedStmt(ts.id)
}
delete(ts.ctx.stmts, int(ts.id))
// close ResultSet associated with this statement
if ts.rs != nil {
terror.Call(ts.rs.Close)
}
return nil
}
// GetCursorActive implements PreparedStatement GetCursorActive method.
func (ts *TiDBStatement) GetCursorActive() bool {
return ts.hasActiveCursor
}
// SetCursorActive implements PreparedStatement SetCursorActive method.
func (ts *TiDBStatement) SetCursorActive(fetchEnd bool) {
ts.hasActiveCursor = fetchEnd
}
// OpenCtx implements IDriver.
func (qd *TiDBDriver) OpenCtx(connID uint64, capability uint32, collation uint8, dbname string, tlsState *tls.ConnectionState, extensions *extension.SessionExtensions) (*TiDBContext, error) {
se, err := session.CreateSession(qd.store)
@ -368,8 +361,8 @@ func (tc *TiDBContext) EncodeSessionStates(ctx context.Context, sctx sessionctx.
return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("prepared statements have bound params")
}
}
if rs := stmt.GetResultSet(); rs != nil && !rs.IsClosed() {
return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("prepared statements have open result sets")
if stmt.GetCursorActive() {
return sessionstates.ErrCannotMigrateSession.GenWithStackByArgs("prepared statements have unfetched rows")
}
preparedStmtInfo.ParamTypes = stmt.GetParamsType()
}
@ -492,46 +485,6 @@ func (trs *tidbResultSet) Columns() []*ColumnInfo {
return trs.columns
}
// rsWithHooks wraps a ResultSet with some hooks (currently only onClosed).
type rsWithHooks struct {
ResultSet
onClosed func()
}
// Close implements ResultSet#Close
func (rs *rsWithHooks) Close() error {
closed := rs.IsClosed()
err := rs.ResultSet.Close()
if !closed && rs.onClosed != nil {
rs.onClosed()
}
return err
}
// OnFetchReturned implements fetchNotifier#OnFetchReturned
func (rs *rsWithHooks) OnFetchReturned() {
if impl, ok := rs.ResultSet.(fetchNotifier); ok {
impl.OnFetchReturned()
}
}
// Unwrap returns the underlying result set
func (rs *rsWithHooks) Unwrap() ResultSet {
return rs.ResultSet
}
// unwrapResultSet likes errors.Cause but for ResultSet
func unwrapResultSet(rs ResultSet) ResultSet {
var unRS ResultSet
if u, ok := rs.(interface{ Unwrap() ResultSet }); ok {
unRS = u.Unwrap()
}
if unRS == nil {
return rs
}
return unwrapResultSet(unRS)
}
func convertColumnInfo(fld *ast.ResultField) (ci *ColumnInfo) {
ci = &ColumnInfo{
Name: fld.ColumnAsName.O,

View File

@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/stretchr/testify/require"
)
@ -95,27 +94,3 @@ func TestConvertColumnInfo(t *testing.T) {
colInfo = convertColumnInfo(&resultField)
require.Equal(t, uint32(4), colInfo.ColumnLength)
}
func TestRSWithHooks(t *testing.T) {
closeCount := 0
rs := &rsWithHooks{
ResultSet: &tidbResultSet{recordSet: new(sqlexec.SimpleRecordSet)},
onClosed: func() { closeCount++ },
}
require.Equal(t, 0, closeCount)
rs.Close()
require.Equal(t, 1, closeCount)
rs.Close()
require.Equal(t, 1, closeCount)
}
func TestUnwrapRS(t *testing.T) {
var nilRS ResultSet
require.Nil(t, unwrapResultSet(nilRS))
rs0 := new(tidbResultSet)
rs1 := &rsWithHooks{ResultSet: rs0}
rs2 := &rsWithHooks{ResultSet: rs1}
for _, rs := range []ResultSet{rs0, rs1, rs2} {
require.Equal(t, rs0, unwrapResultSet(rs))
}
}

View File

@ -1012,37 +1012,3 @@ func (s *Server) KillNonFlashbackClusterConn() {
s.Kill(id, false)
}
}
// GetMinStartTS implements SessionManager interface.
func (s *Server) GetMinStartTS(lowerBound uint64) (ts uint64) {
// sys processes
if s.dom != nil {
for _, pi := range s.dom.SysProcTracker().GetSysProcessList() {
if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
}
}
// user sessions
func() {
s.rwlock.RLock()
defer s.rwlock.RUnlock()
for _, client := range s.clients {
if thisTS := client.ctx.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
}
}()
// internal sessions
func() {
s.sessionMapMutex.Lock()
defer s.sessionMapMutex.Unlock()
analyzeProcID := util.GetAutoAnalyzeProcID(s.ServerID)
for se := range s.internalSessions {
if thisTS, processInfoID := session.GetStartTSFromSession(se); processInfoID != analyzeProcID && thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
}
}()
return
}

View File

@ -1556,7 +1556,6 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
OOMAlarmVariablesInfo: s.getOomAlarmVariablesInfo(),
MaxExecutionTime: maxExecutionTime,
RedactSQL: s.sessionVars.EnableRedactLog,
ProtectedTSList: &s.sessionVars.ProtectedTSList,
ResourceGroupName: s.sessionVars.ResourceGroupName,
}
oldPi := s.ShowProcess()

View File

@ -1384,9 +1384,6 @@ type SessionVars struct {
// Resource group name
ResourceGroupName string
// ProtectedTSList holds a list of timestamps that should delay GC.
ProtectedTSList protectedTSList
// PessimisticTransactionFairLocking controls whether fair locking for pessimistic transaction
// is enabled.
PessimisticTransactionFairLocking bool
@ -3315,53 +3312,3 @@ func (s *SessionVars) GetRelatedTableForMDL() *sync.Map {
func (s *SessionVars) EnableForceInlineCTE() bool {
return s.enableForceInlineCTE
}
// protectedTSList implements util/processinfo#ProtectedTSList
type protectedTSList struct {
sync.Mutex
items map[uint64]int
}
// HoldTS holds the timestamp to prevent its data from being GCed.
func (lst *protectedTSList) HoldTS(ts uint64) (unhold func()) {
lst.Lock()
if lst.items == nil {
lst.items = map[uint64]int{}
}
lst.items[ts] += 1
lst.Unlock()
var once sync.Once
return func() {
once.Do(func() {
lst.Lock()
if lst.items != nil {
if lst.items[ts] > 1 {
lst.items[ts] -= 1
} else {
delete(lst.items, ts)
}
}
lst.Unlock()
})
}
}
// GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one).
func (lst *protectedTSList) GetMinProtectedTS(lowerBound uint64) (ts uint64) {
lst.Lock()
for k, v := range lst.items {
if v > 0 && k > lowerBound && (k < ts || ts == 0) {
ts = k
}
}
lst.Unlock()
return
}
// Size returns the number of protected timestamps (exported for test).
func (lst *protectedTSList) Size() (size int) {
lst.Lock()
size = len(lst.items)
lst.Unlock()
return
}

View File

@ -489,60 +489,6 @@ func TestGetReuseChunk(t *testing.T) {
require.Nil(t, sessVars.ChunkPool.Alloc)
}
func TestPretectedTSList(t *testing.T) {
lst := &variable.NewSessionVars(nil).ProtectedTSList
// empty set
require.Equal(t, uint64(0), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(1))
require.Equal(t, 0, lst.Size())
// hold 1
unhold1 := lst.HoldTS(1)
require.Equal(t, uint64(1), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(1))
// hold 2 twice
unhold2a := lst.HoldTS(2)
unhold2b := lst.HoldTS(2)
require.Equal(t, uint64(1), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(2), lst.GetMinProtectedTS(1))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(2))
require.Equal(t, 2, lst.Size())
// unhold 2a
unhold2a()
require.Equal(t, uint64(1), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(2), lst.GetMinProtectedTS(1))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(2))
require.Equal(t, 2, lst.Size())
// unhold 2a again
unhold2a()
require.Equal(t, uint64(1), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(2), lst.GetMinProtectedTS(1))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(2))
require.Equal(t, 2, lst.Size())
// unhold 1
unhold1()
require.Equal(t, uint64(2), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(2), lst.GetMinProtectedTS(1))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(2))
require.Equal(t, 1, lst.Size())
// unhold 2b
unhold2b()
require.Equal(t, uint64(0), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(1))
require.Equal(t, 0, lst.Size())
// unhold 2b again
unhold2b()
require.Equal(t, uint64(0), lst.GetMinProtectedTS(0))
require.Equal(t, uint64(0), lst.GetMinProtectedTS(1))
require.Equal(t, 0, lst.Size())
}
func TestUserVarConcurrently(t *testing.T) {
sv := variable.NewSessionVars(nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)

View File

@ -22,7 +22,6 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
@ -108,8 +107,7 @@ func NeedSetRCCheckTSFlag(ctx sessionctx.Context, node ast.Node) bool {
sessionVars := ctx.GetSessionVars()
if sessionVars.ConnectionID > 0 && variable.EnableRCReadCheckTS.Load() &&
sessionVars.InTxn() && !sessionVars.RetryInfo.Retrying &&
plannercore.IsReadOnly(node, sessionVars) &&
!ctx.GetSessionVars().GetStatusFlag(mysql.ServerStatusCursorExists) {
plannercore.IsReadOnly(node, sessionVars) {
return true
}
return false

View File

@ -74,7 +74,7 @@ go_test(
],
embed = [":tables"],
flaky = True,
shard_count = 10,
shard_count = 50,
deps = [
"//ddl",
"//domain",

View File

@ -145,7 +145,7 @@ func (msm *MockSessionManager) KillNonFlashbackClusterConn() {
}
}
// CheckOldRunningTxn implement SessionManager interface.
// CheckOldRunningTxn is to get all startTS of every transactions running in the current internal sessions
func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string) {
msm.mu.Lock()
for _, se := range msm.conn {
@ -153,25 +153,3 @@ func (msm *MockSessionManager) CheckOldRunningTxn(job2ver map[int64]int64, job2i
}
msm.mu.Unlock()
}
// GetMinStartTS implements SessionManager interface.
func (msm *MockSessionManager) GetMinStartTS(lowerBound uint64) (ts uint64) {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
if len(msm.PS) > 0 {
for _, pi := range msm.PS {
if thisTS := pi.GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
}
return
}
msm.mu.Lock()
defer msm.mu.Unlock()
for _, s := range msm.conn {
if thisTS := s.ShowProcess().GetMinStartTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
}
return
}

View File

@ -31,14 +31,6 @@ import (
"github.com/tikv/client-go/v2/oracle"
)
// ProtectedTSList holds a list of timestamps that should delay GC.
type ProtectedTSList interface {
// HoldTS holds the timestamp to prevent its data from being GCed.
HoldTS(ts uint64) (unhold func())
// GetMinProtectedTS returns the minimum protected timestamp that greater than `lowerBound` (0 if no such one).
GetMinProtectedTS(lowerBound uint64) (ts uint64)
}
// OOMAlarmVariablesInfo is a struct for OOM alarm variables.
type OOMAlarmVariablesInfo struct {
SessionAnalyzeVersion int
@ -48,7 +40,6 @@ type OOMAlarmVariablesInfo struct {
// ProcessInfo is a struct used for show processlist statement.
type ProcessInfo struct {
ProtectedTSList
Time time.Time
ExpensiveLogTime time.Time
Plan interface{}
@ -139,23 +130,6 @@ func (pi *ProcessInfo) ToRow(tz *time.Location) []interface{} {
return append(pi.ToRowForShow(true), pi.Digest, bytesConsumed, diskConsumed, pi.txnStartTs(tz), pi.ResourceGroupName)
}
// GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one).
func (pi *ProcessInfo) GetMinStartTS(lowerBound uint64) (ts uint64) {
if pi == nil {
return
}
if thisTS := pi.CurTxnStartTS; thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
if pi.ProtectedTSList == nil {
return
}
if thisTS := pi.GetMinProtectedTS(lowerBound); thisTS > lowerBound && (thisTS < ts || ts == 0) {
ts = thisTS
}
return
}
// ascServerStatus is a slice of all defined server status in ascending order.
var ascServerStatus = []uint16{
mysql.ServerStatusInTrans,
@ -224,8 +198,6 @@ type SessionManager interface {
CheckOldRunningTxn(job2ver map[int64]int64, job2ids map[int64]string)
// KillNonFlashbackClusterConn kill all non flashback cluster connections.
KillNonFlashbackClusterConn()
// GetMinStartTS returns the minimum start-ts (used to delay GC) that greater than `lowerBound` (0 if no such one).
GetMinStartTS(lowerBound uint64) uint64
}
// GlobalConnID is the global connection ID, providing UNIQUE connection IDs across the whole TiDB cluster.