Merge pull request #642 from pingcap/siddontang/auto-rollback
tidb: err should rollback in auto commit or DDL.
This commit is contained in:
12
session.go
12
session.go
@ -95,7 +95,6 @@ func (h *stmtHistory) add(stmtID uint32, st stmt.Statement, params ...interface{
|
||||
}
|
||||
|
||||
func (h *stmtHistory) reset() {
|
||||
|
||||
if len(h.history) > 0 {
|
||||
h.history = h.history[:0]
|
||||
}
|
||||
@ -484,17 +483,10 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) {
|
||||
return s.txn, nil
|
||||
}
|
||||
if forceNew {
|
||||
err = s.txn.Commit()
|
||||
variable.GetSessionVars(s).SetStatusFlag(mysql.ServerStatusInTrans, false)
|
||||
err = s.FinishTxn(false)
|
||||
if err != nil {
|
||||
if !s.retrying && kv.IsRetryableError(err) {
|
||||
err = s.Retry()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
s.resetHistory()
|
||||
s.txn, err = s.store.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@ -1176,3 +1176,45 @@ func (s *testSessionSuite) TestSession(c *C) {
|
||||
err := se.Close()
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
|
||||
func (s *testSessionSuite) TestErrorRollback(c *C) {
|
||||
store := newStore(c, s.dbName)
|
||||
s1 := newSession(c, store, s.dbName)
|
||||
|
||||
defer s1.Close()
|
||||
|
||||
mustExecSQL(c, s1, "drop table if exists t_rollback")
|
||||
mustExecSQL(c, s1, "create table t_rollback (c1 int, c2 int, primary key(c1))")
|
||||
|
||||
_, err := s1.Execute("insert into t_rollback values (0, 0)")
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
cnt := 10
|
||||
wg.Add(cnt)
|
||||
num := 1000
|
||||
|
||||
for i := 0; i < cnt; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
se := newSession(c, store, s.dbName)
|
||||
// retry forever
|
||||
se.(*session).maxRetryCnt = unlimitedRetryCnt
|
||||
defer se.Close()
|
||||
|
||||
for j := 0; j < num; j++ {
|
||||
// force generate a txn in session for later insert use.
|
||||
se.(*session).GetTxn(false)
|
||||
|
||||
se.Execute("insert into t_rollback values (1, 1, 1)")
|
||||
|
||||
_, err = se.Execute("update t_rollback set c2 = c2 + 1 where c1 = 0")
|
||||
c.Assert(err, IsNil)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
mustExecMatch(c, s1, "select c2 from t_rollback where c1 = 0", [][]interface{}{{cnt * num}})
|
||||
}
|
||||
|
||||
8
tidb.go
8
tidb.go
@ -225,8 +225,12 @@ func runStmt(ctx context.Context, s stmt.Statement, args ...interface{}) (rset.R
|
||||
se.history.add(0, s)
|
||||
}
|
||||
// MySQL DDL should be auto-commit
|
||||
if err == nil && (s.IsDDL() || autocommit.ShouldAutocommit(ctx)) {
|
||||
err = ctx.FinishTxn(false)
|
||||
if s.IsDDL() || autocommit.ShouldAutocommit(ctx) {
|
||||
if err != nil {
|
||||
ctx.FinishTxn(true)
|
||||
} else {
|
||||
err = ctx.FinishTxn(false)
|
||||
}
|
||||
}
|
||||
return rs, errors.Trace(err)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user