From 30300f2af755a3c15c4d1ce84416515efdd9346e Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 8 Sep 2015 20:47:07 +0800 Subject: [PATCH 01/10] session: Experimental transaction retry Support query like: set a=a+1 --- driver.go | 9 +++- session.go | 117 +++++++++++++++++++++++++++++++++++++++++++++------ tidb_test.go | 7 ++- 3 files changed, 118 insertions(+), 15 deletions(-) diff --git a/driver.go b/driver.go index c69618736b..64360c3bcd 100644 --- a/driver.go +++ b/driver.go @@ -29,12 +29,14 @@ import ( "sync" "github.com/juju/errors" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" mysql "github.com/pingcap/tidb/mysqldef" "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx" qerror "github.com/pingcap/tidb/util/errors" "github.com/pingcap/tidb/util/types" + "github.com/reborndb/go/errors2" ) const ( @@ -238,8 +240,13 @@ func (c *driverConn) Commit() error { if c.s == nil { return qerror.ErrCommitNotInTransaction } + _, err := c.s.Execute(txCommitSQL) - if _, err := c.s.Execute(txCommitSQL); err != nil { + if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { + return c.s.Retry() + } + + if err != nil { return err } diff --git a/session.go b/session.go index 7411b908c8..91e54c00da 100644 --- a/session.go +++ b/session.go @@ -32,7 +32,10 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/db" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/stmt" + "github.com/pingcap/tidb/stmt/stmts" "github.com/pingcap/tidb/util/types" + "github.com/reborndb/go/errors2" ) // Session context @@ -51,6 +54,7 @@ type Session interface { DropPreparedStmt(stmtID uint32) error SetClientCapability(uint32) // Set client capability flags Close() error + Retry() error } var ( @@ -58,14 +62,46 @@ var ( sessionID int64 ) +type stmtRecord struct { + stmtID uint32 + st stmt.Statement + params []interface{} +} + +type stmtHistory struct { + history []*stmtRecord +} + +func (h *stmtHistory) add(stmtID uint32, st stmt.Statement, params ...interface{}) { + s := &stmtRecord{ + stmtID: stmtID, + st: st, + params: append(([]interface{})(nil), params...), + } + + h.history = append(h.history, s) +} + +func (h *stmtHistory) reset() { + h.history = h.history[:0] +} + +func (h *stmtHistory) clone() *stmtHistory { + nh := *h + nh.history = make([]*stmtRecord, len(h.history)) + copy(nh.history, h.history) + return &nh +} + type session struct { txn kv.Transaction // Current transaction userName string args []interface{} // Statment execution args, this should be cleaned up after exec - values map[fmt.Stringer]interface{} - store kv.Storage - sid int64 + values map[fmt.Stringer]interface{} + store kv.Storage + sid int64 + history stmtHistory } func (s *session) Status() uint16 { @@ -104,9 +140,11 @@ func (s *session) FinishTxn(rollback bool) error { err := s.txn.Commit() if err != nil { log.Errorf("txn:%s, %v", s.txn, err) + return errors.Trace(err) } - return errors.Trace(err) + s.history.reset() + return nil } func (s *session) String() string { @@ -126,8 +164,55 @@ func (s *session) String() string { return string(b) } +func isPreparedStmt(st stmt.Statement) bool { + switch st.(type) { + case *stmts.PreparedStmt: + return true + default: + return false + } +} + +func (s *session) Retry() error { + nh := s.history.clone() + defer func() { + s.history.history = nh.history + }() + + var err error + for { + // Clear history + s.history.history = s.history.history[:0] + s.FinishTxn(true) + // TODO: check if select for update statement + success := true + for _, sr := range nh.history { + st := sr.st + // Skip prepare statement + if isPreparedStmt(st) { + continue + } + log.Info("Retry %s", st.OriginText()) + _, err = runStmt(s, st) + if err != nil { + if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { + success = false + break + } + log.Warnf("session:%v, err:%v", s, err) + return errors.Trace(err) + } + } + if success { + return nil + } + } + + return nil +} + func (s *session) Execute(sql string) ([]rset.Recordset, error) { - stmts, err := Compile(sql) + statements, err := Compile(sql) if err != nil { log.Errorf("Syntax error: %s", sql) log.Errorf("Error occurs at %s.", err) @@ -136,13 +221,21 @@ func (s *session) Execute(sql string) ([]rset.Recordset, error) { var rs []rset.Recordset - for _, si := range stmts { - r, err := runStmt(s, si) + for _, st := range statements { + r, err := runStmt(s, st) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } + // Record executed query + if isPreparedStmt(st) { + ps := st.(*stmts.PreparedStmt) + s.history.add(ps.ID, st) + } else { + s.history.add(0, st) + } + if r != nil { rs = append(rs, r) } @@ -183,12 +276,10 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (rset. if err != nil { return nil, err } - //convert args to param - rs, err := executePreparedStmt(s, stmtID, args...) - if err != nil { - return nil, err - } - return rs, nil + + st := &stmts.ExecuteStmt{ID: stmtID} + s.history.add(stmtID, st, args...) + return runStmt(s, st, args...) } func (s *session) DropPreparedStmt(stmtID uint32) error { diff --git a/tidb_test.go b/tidb_test.go index 56fc4c8985..85b22a0712 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -28,6 +28,7 @@ import ( mysql "github.com/pingcap/tidb/mysqldef" "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/reborndb/go/errors2" ) var store = flag.String("store", "memory", "registered store name, [memory, goleveldb, boltdb]") @@ -480,7 +481,6 @@ func (s *testSessionSuite) TestAutoicommit(c *C) { // See: http://dev.mysql.com/doc/refman/5.7/en/commit.html func (s *testSessionSuite) TestRowLock(c *C) { - c.Skip("Need retry feature") store := newStore(c, s.dbName) se := newSession(c, store, s.dbName) se1 := newSession(c, store, s.dbName) @@ -502,6 +502,11 @@ func (s *testSessionSuite) TestRowLock(c *C) { _, err := exec(c, se1, "commit") // row lock conflict but can still success + if errors2.ErrorNotEqual(err, kv.ErrConditionNotMatch) { + c.Fail() + } + // Retry should success + err = se.Retry() c.Assert(err, IsNil) mustExecSQL(c, se1, "begin") From 2e690db41e050427d8bf52c02122b2651714a89b Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 12:03:21 +0800 Subject: [PATCH 02/10] session: Check if select for update statement Make sure we do not retry "select for update statement" --- plan/plans/lock.go | 2 ++ session.go | 9 +++++-- sessionctx/forupdate/for_update_ctx.go | 33 ++++++++++++++++++++++++++ tidb_test.go | 3 +++ 4 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 sessionctx/forupdate/for_update_ctx.go diff --git a/plan/plans/lock.go b/plan/plans/lock.go index 703637a8a4..6762e870f5 100644 --- a/plan/plans/lock.go +++ b/plan/plans/lock.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/field" "github.com/pingcap/tidb/parser/coldef" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/sessionctx/forupdate" "github.com/pingcap/tidb/util/format" ) @@ -48,6 +49,7 @@ func (r *SelectLockPlan) Do(ctx context.Context, f plan.RowIterFunc) error { } } if rowKeys != nil && r.Lock == coldef.SelectLockForUpdate { + forupdate.SetForUpdate(ctx) txn, err := ctx.GetTxn(false) if err != nil { return false, errors.Trace(err) diff --git a/session.go b/session.go index 91e54c00da..41de0ab1c3 100644 --- a/session.go +++ b/session.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/db" + "github.com/pingcap/tidb/sessionctx/forupdate" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/stmt" "github.com/pingcap/tidb/stmt/stmts" @@ -139,7 +140,7 @@ func (s *session) FinishTxn(rollback bool) error { err := s.txn.Commit() if err != nil { - log.Errorf("txn:%s, %v", s.txn, err) + log.Warnf("txn:%s, %v", s.txn, err) return errors.Trace(err) } @@ -179,6 +180,10 @@ func (s *session) Retry() error { s.history.history = nh.history }() + if forUpdate := s.Value(forupdate.ForUpdateKey); forUpdate != nil { + return errors.Errorf("can not retry select for update statement") + } + var err error for { // Clear history @@ -192,7 +197,7 @@ func (s *session) Retry() error { if isPreparedStmt(st) { continue } - log.Info("Retry %s", st.OriginText()) + log.Warnf("Retry %s", st.OriginText()) _, err = runStmt(s, st) if err != nil { if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { diff --git a/sessionctx/forupdate/for_update_ctx.go b/sessionctx/forupdate/for_update_ctx.go new file mode 100644 index 0000000000..109ba0752a --- /dev/null +++ b/sessionctx/forupdate/for_update_ctx.go @@ -0,0 +1,33 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Record information for "select ... for update" statement +package forupdate + +import "github.com/pingcap/tidb/context" + +// A dummy type to avoid naming collision in context. +type forupdateKeyType int + +// String defines a Stringer function for debugging and pretty printing. +func (k forupdateKeyType) String() string { + return "for update" +} + +// ForUpdateKey is used to retrive "select for update" statement information +const ForUpdateKey forupdateKeyType = 0 + +// BindDomain binds domain to context. +func SetForUpdate(ctx context.Context) { + ctx.SetValue(ForUpdateKey, true) +} diff --git a/tidb_test.go b/tidb_test.go index 85b22a0712..93a9148783 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -545,6 +545,9 @@ func (s *testSessionSuite) TestSelectForUpdate(c *C) { _, err = exec(c, se1, "commit") c.Assert(err, NotNil) + err = se1.Retry() + // retry should fail + c.Assert(err, NotNil) // not conflict mustExecSQL(c, se1, "begin") From 7c24e9d7e8ea1c9af142012a9f0d38266e99073d Mon Sep 17 00:00:00 2001 From: ngaut Date: Tue, 8 Sep 2015 20:47:07 +0800 Subject: [PATCH 03/10] session: Experimental transaction retry Support query like: set a=a+1 --- driver.go | 9 +++- session.go | 117 +++++++++++++++++++++++++++++++++++++++++++++------ tidb_test.go | 7 ++- 3 files changed, 118 insertions(+), 15 deletions(-) diff --git a/driver.go b/driver.go index b32fc5f81b..476ab9eb2e 100644 --- a/driver.go +++ b/driver.go @@ -29,12 +29,14 @@ import ( "sync" "github.com/juju/errors" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" mysql "github.com/pingcap/tidb/mysqldef" "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx" qerror "github.com/pingcap/tidb/util/errors" "github.com/pingcap/tidb/util/types" + "github.com/reborndb/go/errors2" ) const ( @@ -239,8 +241,13 @@ func (c *driverConn) Commit() error { if c.s == nil { return qerror.ErrCommitNotInTransaction } + _, err := c.s.Execute(txCommitSQL) - if _, err := c.s.Execute(txCommitSQL); err != nil { + if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { + return c.s.Retry() + } + + if err != nil { return err } diff --git a/session.go b/session.go index b22546357f..eb9602a252 100644 --- a/session.go +++ b/session.go @@ -32,7 +32,10 @@ import ( "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/db" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/stmt" + "github.com/pingcap/tidb/stmt/stmts" "github.com/pingcap/tidb/util/types" + "github.com/reborndb/go/errors2" ) // Session context @@ -51,6 +54,7 @@ type Session interface { DropPreparedStmt(stmtID uint32) error SetClientCapability(uint32) // Set client capability flags Close() error + Retry() error } var ( @@ -58,14 +62,46 @@ var ( sessionID int64 ) +type stmtRecord struct { + stmtID uint32 + st stmt.Statement + params []interface{} +} + +type stmtHistory struct { + history []*stmtRecord +} + +func (h *stmtHistory) add(stmtID uint32, st stmt.Statement, params ...interface{}) { + s := &stmtRecord{ + stmtID: stmtID, + st: st, + params: append(([]interface{})(nil), params...), + } + + h.history = append(h.history, s) +} + +func (h *stmtHistory) reset() { + h.history = h.history[:0] +} + +func (h *stmtHistory) clone() *stmtHistory { + nh := *h + nh.history = make([]*stmtRecord, len(h.history)) + copy(nh.history, h.history) + return &nh +} + type session struct { txn kv.Transaction // Current transaction userName string args []interface{} // Statment execution args, this should be cleaned up after exec - values map[fmt.Stringer]interface{} - store kv.Storage - sid int64 + values map[fmt.Stringer]interface{} + store kv.Storage + sid int64 + history stmtHistory } func (s *session) Status() uint16 { @@ -104,9 +140,11 @@ func (s *session) FinishTxn(rollback bool) error { err := s.txn.Commit() if err != nil { log.Errorf("txn:%s, %v", s.txn, err) + return errors.Trace(err) } - return errors.Trace(err) + s.history.reset() + return nil } func (s *session) String() string { @@ -126,8 +164,55 @@ func (s *session) String() string { return string(b) } +func isPreparedStmt(st stmt.Statement) bool { + switch st.(type) { + case *stmts.PreparedStmt: + return true + default: + return false + } +} + +func (s *session) Retry() error { + nh := s.history.clone() + defer func() { + s.history.history = nh.history + }() + + var err error + for { + // Clear history + s.history.history = s.history.history[:0] + s.FinishTxn(true) + // TODO: check if select for update statement + success := true + for _, sr := range nh.history { + st := sr.st + // Skip prepare statement + if isPreparedStmt(st) { + continue + } + log.Info("Retry %s", st.OriginText()) + _, err = runStmt(s, st) + if err != nil { + if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { + success = false + break + } + log.Warnf("session:%v, err:%v", s, err) + return errors.Trace(err) + } + } + if success { + return nil + } + } + + return nil +} + func (s *session) Execute(sql string) ([]rset.Recordset, error) { - stmts, err := Compile(sql) + statements, err := Compile(sql) if err != nil { log.Errorf("Syntax error: %s", sql) log.Errorf("Error occurs at %s.", err) @@ -136,13 +221,21 @@ func (s *session) Execute(sql string) ([]rset.Recordset, error) { var rs []rset.Recordset - for _, si := range stmts { - r, err := runStmt(s, si) + for _, st := range statements { + r, err := runStmt(s, st) if err != nil { log.Warnf("session:%v, err:%v", s, err) return nil, errors.Trace(err) } + // Record executed query + if isPreparedStmt(st) { + ps := st.(*stmts.PreparedStmt) + s.history.add(ps.ID, st) + } else { + s.history.add(0, st) + } + if r != nil { rs = append(rs, r) } @@ -183,12 +276,10 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (rset. if err != nil { return nil, err } - //convert args to param - rs, err := executePreparedStmt(s, stmtID, args...) - if err != nil { - return nil, err - } - return rs, nil + + st := &stmts.ExecuteStmt{ID: stmtID} + s.history.add(stmtID, st, args...) + return runStmt(s, st, args...) } func (s *session) DropPreparedStmt(stmtID uint32) error { diff --git a/tidb_test.go b/tidb_test.go index 746d9d50cb..f7eeeffaa8 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -28,6 +28,7 @@ import ( mysql "github.com/pingcap/tidb/mysqldef" "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/reborndb/go/errors2" ) var store = flag.String("store", "memory", "registered store name, [memory, goleveldb, boltdb]") @@ -500,7 +501,6 @@ func (s *testSessionSuite) TestAutoicommit(c *C) { // See: http://dev.mysql.com/doc/refman/5.7/en/commit.html func (s *testSessionSuite) TestRowLock(c *C) { - c.Skip("Need retry feature") store := newStore(c, s.dbName) se := newSession(c, store, s.dbName) se1 := newSession(c, store, s.dbName) @@ -522,6 +522,11 @@ func (s *testSessionSuite) TestRowLock(c *C) { _, err := exec(c, se1, "commit") // row lock conflict but can still success + if errors2.ErrorNotEqual(err, kv.ErrConditionNotMatch) { + c.Fail() + } + // Retry should success + err = se.Retry() c.Assert(err, IsNil) mustExecSQL(c, se1, "begin") From 2342cfc10798dac2a5ab7938548df442569efaed Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 12:03:21 +0800 Subject: [PATCH 04/10] session: Check if select for update statement Make sure we do not retry "select for update statement" --- plan/plans/lock.go | 2 ++ session.go | 9 +++++-- sessionctx/forupdate/for_update_ctx.go | 33 ++++++++++++++++++++++++++ tidb_test.go | 3 +++ 4 files changed, 45 insertions(+), 2 deletions(-) create mode 100644 sessionctx/forupdate/for_update_ctx.go diff --git a/plan/plans/lock.go b/plan/plans/lock.go index 703637a8a4..6762e870f5 100644 --- a/plan/plans/lock.go +++ b/plan/plans/lock.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/field" "github.com/pingcap/tidb/parser/coldef" "github.com/pingcap/tidb/plan" + "github.com/pingcap/tidb/sessionctx/forupdate" "github.com/pingcap/tidb/util/format" ) @@ -48,6 +49,7 @@ func (r *SelectLockPlan) Do(ctx context.Context, f plan.RowIterFunc) error { } } if rowKeys != nil && r.Lock == coldef.SelectLockForUpdate { + forupdate.SetForUpdate(ctx) txn, err := ctx.GetTxn(false) if err != nil { return false, errors.Trace(err) diff --git a/session.go b/session.go index eb9602a252..2005ae972a 100644 --- a/session.go +++ b/session.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/db" + "github.com/pingcap/tidb/sessionctx/forupdate" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/stmt" "github.com/pingcap/tidb/stmt/stmts" @@ -139,7 +140,7 @@ func (s *session) FinishTxn(rollback bool) error { err := s.txn.Commit() if err != nil { - log.Errorf("txn:%s, %v", s.txn, err) + log.Warnf("txn:%s, %v", s.txn, err) return errors.Trace(err) } @@ -179,6 +180,10 @@ func (s *session) Retry() error { s.history.history = nh.history }() + if forUpdate := s.Value(forupdate.ForUpdateKey); forUpdate != nil { + return errors.Errorf("can not retry select for update statement") + } + var err error for { // Clear history @@ -192,7 +197,7 @@ func (s *session) Retry() error { if isPreparedStmt(st) { continue } - log.Info("Retry %s", st.OriginText()) + log.Warnf("Retry %s", st.OriginText()) _, err = runStmt(s, st) if err != nil { if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { diff --git a/sessionctx/forupdate/for_update_ctx.go b/sessionctx/forupdate/for_update_ctx.go new file mode 100644 index 0000000000..109ba0752a --- /dev/null +++ b/sessionctx/forupdate/for_update_ctx.go @@ -0,0 +1,33 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +// Record information for "select ... for update" statement +package forupdate + +import "github.com/pingcap/tidb/context" + +// A dummy type to avoid naming collision in context. +type forupdateKeyType int + +// String defines a Stringer function for debugging and pretty printing. +func (k forupdateKeyType) String() string { + return "for update" +} + +// ForUpdateKey is used to retrive "select for update" statement information +const ForUpdateKey forupdateKeyType = 0 + +// BindDomain binds domain to context. +func SetForUpdate(ctx context.Context) { + ctx.SetValue(ForUpdateKey, true) +} diff --git a/tidb_test.go b/tidb_test.go index f7eeeffaa8..ab142f9394 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -565,6 +565,9 @@ func (s *testSessionSuite) TestSelectForUpdate(c *C) { _, err = exec(c, se1, "commit") c.Assert(err, NotNil) + err = se1.Retry() + // retry should fail + c.Assert(err, NotNil) // not conflict mustExecSQL(c, se1, "begin") From b1dbfc1e26a5b0fdd98bbe81704ba8559281a1a6 Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 12:44:46 +0800 Subject: [PATCH 05/10] forupdate: Fix comments --- sessionctx/forupdate/for_update_ctx.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sessionctx/forupdate/for_update_ctx.go b/sessionctx/forupdate/for_update_ctx.go index 109ba0752a..f413bd1c50 100644 --- a/sessionctx/forupdate/for_update_ctx.go +++ b/sessionctx/forupdate/for_update_ctx.go @@ -11,7 +11,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Record information for "select ... for update" statement +// Package forupdate record information for "select ... for update" statement package forupdate import "github.com/pingcap/tidb/context" @@ -27,7 +27,7 @@ func (k forupdateKeyType) String() string { // ForUpdateKey is used to retrive "select for update" statement information const ForUpdateKey forupdateKeyType = 0 -// BindDomain binds domain to context. +// SetForUpdate set "select for update" flag. func SetForUpdate(ctx context.Context) { ctx.SetValue(ForUpdateKey, true) } From 9827f6c18de3b93e2362aa33985a9f98d2914ae6 Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 13:34:16 +0800 Subject: [PATCH 06/10] Session: Clean up Reset statement history --- session.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/session.go b/session.go index 2005ae972a..abc415f394 100644 --- a/session.go +++ b/session.go @@ -297,6 +297,7 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { var err error if s.txn == nil { + s.history.reset() s.txn, err = s.store.Begin() if err != nil { return nil, err @@ -310,6 +311,7 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { if err != nil { return nil, err } + s.history.reset() s.txn, err = s.store.Begin() if err != nil { return nil, err From 042ba3b3cfb10f609822adf727f2e43f97979b87 Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 15:27:35 +0800 Subject: [PATCH 07/10] session: fix reset and clean up Fix add unique key --- kv/index_iter.go | 6 ++--- kv/iter.go | 2 ++ session.go | 59 ++++++++++++++++++++++++++++++++---------- stmt/stmts/insert.go | 3 +-- table/tables/tables.go | 2 +- 5 files changed, 52 insertions(+), 20 deletions(-) diff --git a/kv/index_iter.go b/kv/index_iter.go index 4e057c6125..78cac4cf77 100644 --- a/kv/index_iter.go +++ b/kv/index_iter.go @@ -23,8 +23,6 @@ import ( "github.com/juju/errors" ) -// Cockroach sql index implementation - var ( _ Index = (*kvIndex)(nil) _ IndexIterator = (*IndexIter)(nil) @@ -138,7 +136,7 @@ func (c *kvIndex) genIndexKey(indexedValues []interface{}, h int64) ([]byte, err } // Create creates a new entry in the kvIndex data. -// If the index is unique and there already exists an entry with the same key, Create will return ErrConditionNotMatch +// If the index is unique and there already exists an entry with the same key, Create will return ErrKeyExists func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64) error { keyBuf, err := c.genIndexKey(indexedValues, h) if err != nil { @@ -157,7 +155,7 @@ func (c *kvIndex) Create(txn Transaction, indexedValues []interface{}, h int64) return errors.Trace(err) } - return errors.Trace(ErrConditionNotMatch) + return errors.Trace(ErrKeyExists) } // Delete removes the entry for handle h and indexdValues from KV index. diff --git a/kv/iter.go b/kv/iter.go index b914ff44cf..41c211dded 100644 --- a/kv/iter.go +++ b/kv/iter.go @@ -24,6 +24,8 @@ var ( ErrClosed = errors.New("Error: Transaction already closed") // ErrNotExist is used when try to get an entry with an unexist key from KV store. ErrNotExist = errors.New("Error: key not exist") + // ErrKeyExist is used when try to put an entry to KV store. + ErrKeyExists = errors.New("Error: key already exist") // ErrConditionNotMatch is used when condition is not met. ErrConditionNotMatch = errors.New("Error: Condition not match") // ErrLockConflict is used when try to lock an already locked key. diff --git a/session.go b/session.go index abc415f394..fe4b2dc368 100644 --- a/session.go +++ b/session.go @@ -83,8 +83,29 @@ func (h *stmtHistory) add(stmtID uint32, st stmt.Statement, params ...interface{ h.history = append(h.history, s) } +func (h *stmtHistory) del(stmtID uint32) { + pos := -1 + for i, sr := range h.history { + if sr.stmtID == stmtID { + pos = i + break + } + } + + if pos == -1 { + log.Errorf("statement %d not exist", stmtID) + return + } + + count := len(h.history) + h.history[pos] = h.history[count-1] + h.history = h.history[:count-1] +} + func (h *stmtHistory) reset() { - h.history = h.history[:0] + if len(h.history) > 0 { + h.history = h.history[:0] + } } func (h *stmtHistory) clone() *stmtHistory { @@ -98,11 +119,10 @@ type session struct { txn kv.Transaction // Current transaction userName string args []interface{} // Statment execution args, this should be cleaned up after exec - - values map[fmt.Stringer]interface{} - store kv.Storage - sid int64 - history stmtHistory + values map[fmt.Stringer]interface{} + store kv.Storage + sid int64 + history stmtHistory } func (s *session) Status() uint16 { @@ -121,6 +141,11 @@ func (s *session) SetUsername(name string) { s.userName = name } +func (s *session) resetHistory() { + s.ClearValue(forupdate.ForUpdateKey) + s.history.reset() +} + func (s *session) SetClientCapability(capability uint32) { variable.GetSessionVars(s).ClientCapability = capability } @@ -144,7 +169,7 @@ func (s *session) FinishTxn(rollback bool) error { return errors.Trace(err) } - s.history.reset() + s.resetHistory() return nil } @@ -165,6 +190,15 @@ func (s *session) String() string { return string(b) } +func needRetry(st stmt.Statement) bool { + switch st.(type) { + case *stmts.PreparedStmt, *stmts.ShowStmt, *stmts.DoStmt: + return false + default: + return true + } +} + func isPreparedStmt(st stmt.Statement) bool { switch st.(type) { case *stmts.PreparedStmt: @@ -186,15 +220,13 @@ func (s *session) Retry() error { var err error for { - // Clear history - s.history.history = s.history.history[:0] + s.resetHistory() s.FinishTxn(true) - // TODO: check if select for update statement success := true for _, sr := range nh.history { st := sr.st // Skip prepare statement - if isPreparedStmt(st) { + if !needRetry(st) { continue } log.Warnf("Retry %s", st.OriginText()) @@ -288,6 +320,7 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (rset. } func (s *session) DropPreparedStmt(stmtID uint32) error { + s.history.del(stmtID) return dropPreparedStmt(s, stmtID) } @@ -297,7 +330,7 @@ func (s *session) DropPreparedStmt(stmtID uint32) error { func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { var err error if s.txn == nil { - s.history.reset() + s.resetHistory() s.txn, err = s.store.Begin() if err != nil { return nil, err @@ -311,7 +344,7 @@ func (s *session) GetTxn(forceNew bool) (kv.Transaction, error) { if err != nil { return nil, err } - s.history.reset() + s.resetHistory() s.txn, err = s.store.Begin() if err != nil { return nil, err diff --git a/stmt/stmts/insert.go b/stmt/stmts/insert.go index f75f71e7a7..ceb21d3701 100644 --- a/stmt/stmts/insert.go +++ b/stmt/stmts/insert.go @@ -125,7 +125,6 @@ func (s *InsertIntoStmt) execSelect(t table.Table, cols []*column.Col, ctx conte for i, r := range bufRecords { variable.GetSessionVars(ctx).SetLastInsertID(lastInsertIds[i]) - if _, err = t.AddRecord(ctx, r); err != nil { return nil, errors.Trace(err) } @@ -284,7 +283,7 @@ func (s *InsertIntoStmt) Exec(ctx context.Context) (_ rset.Recordset, err error) if err == nil { continue } - if len(s.OnDuplicate) == 0 || !errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { + if len(s.OnDuplicate) == 0 || !errors2.ErrorEqual(err, kv.ErrKeyExists) { return nil, errors.Trace(err) } // On duplicate key Update the duplicate row. diff --git a/table/tables/tables.go b/table/tables/tables.go index 07b645097b..ed737f1452 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -348,7 +348,7 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64, } colVals, _ := v.FetchValues(r) if err = v.X.Create(txn, colVals, recordID); err != nil { - if errors2.ErrorEqual(err, kv.ErrConditionNotMatch) { + if errors2.ErrorEqual(err, kv.ErrKeyExists) { // Get the duplicate row handle iter, _, terr := v.X.Seek(txn, colVals) if terr != nil { From 023d3bdb8538b23d8de47cddb4a6e3822ee3b4c2 Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 17:25:41 +0800 Subject: [PATCH 08/10] Session: clean up Remove unused methods --- session.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/session.go b/session.go index fe4b2dc368..322a86cc0f 100644 --- a/session.go +++ b/session.go @@ -83,25 +83,6 @@ func (h *stmtHistory) add(stmtID uint32, st stmt.Statement, params ...interface{ h.history = append(h.history, s) } -func (h *stmtHistory) del(stmtID uint32) { - pos := -1 - for i, sr := range h.history { - if sr.stmtID == stmtID { - pos = i - break - } - } - - if pos == -1 { - log.Errorf("statement %d not exist", stmtID) - return - } - - count := len(h.history) - h.history[pos] = h.history[count-1] - h.history = h.history[:count-1] -} - func (h *stmtHistory) reset() { if len(h.history) > 0 { h.history = h.history[:0] @@ -320,7 +301,6 @@ func (s *session) ExecutePreparedStmt(stmtID uint32, args ...interface{}) (rset. } func (s *session) DropPreparedStmt(stmtID uint32) error { - s.history.del(stmtID) return dropPreparedStmt(s, stmtID) } From 0061d29cf2a95a3b565b1255bc40505d11ee762c Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 18:24:56 +0800 Subject: [PATCH 09/10] kv: typo --- kv/iter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kv/iter.go b/kv/iter.go index 41c211dded..09ec2197f7 100644 --- a/kv/iter.go +++ b/kv/iter.go @@ -24,7 +24,7 @@ var ( ErrClosed = errors.New("Error: Transaction already closed") // ErrNotExist is used when try to get an entry with an unexist key from KV store. ErrNotExist = errors.New("Error: key not exist") - // ErrKeyExist is used when try to put an entry to KV store. + // ErrKeyExists is used when try to put an entry to KV store. ErrKeyExists = errors.New("Error: key already exist") // ErrConditionNotMatch is used when condition is not met. ErrConditionNotMatch = errors.New("Error: Condition not match") From ccef9934ab48108433bb7d32690d845b5185975e Mon Sep 17 00:00:00 2001 From: ngaut Date: Wed, 9 Sep 2015 18:33:02 +0800 Subject: [PATCH 10/10] Fix auto import --- driver.go | 2 +- session.go | 2 +- tidb_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/driver.go b/driver.go index 476ab9eb2e..ab5a2e01fb 100644 --- a/driver.go +++ b/driver.go @@ -35,8 +35,8 @@ import ( "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx" qerror "github.com/pingcap/tidb/util/errors" + "github.com/pingcap/tidb/util/errors2" "github.com/pingcap/tidb/util/types" - "github.com/reborndb/go/errors2" ) const ( diff --git a/session.go b/session.go index 322a86cc0f..3a1f124268 100644 --- a/session.go +++ b/session.go @@ -35,8 +35,8 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/stmt" "github.com/pingcap/tidb/stmt/stmts" + "github.com/pingcap/tidb/util/errors2" "github.com/pingcap/tidb/util/types" - "github.com/reborndb/go/errors2" ) // Session context diff --git a/tidb_test.go b/tidb_test.go index ab142f9394..a269c1af8b 100644 --- a/tidb_test.go +++ b/tidb_test.go @@ -28,7 +28,7 @@ import ( mysql "github.com/pingcap/tidb/mysqldef" "github.com/pingcap/tidb/rset" "github.com/pingcap/tidb/sessionctx/variable" - "github.com/reborndb/go/errors2" + "github.com/pingcap/tidb/util/errors2" ) var store = flag.String("store", "memory", "registered store name, [memory, goleveldb, boltdb]")