diff --git a/bootstrap_test.go b/bootstrap_test.go index c886cc517c..32386e3b40 100644 --- a/bootstrap_test.go +++ b/bootstrap_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/util/auth" "github.com/pingcap/tidb/util/testleak" + goctx "golang.org/x/net/context" ) var _ = Suite(&testBootstrapSuite{}) @@ -195,7 +196,7 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) { m := meta.NewMeta(txn) err = m.FinishBootstrap(int64(1)) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) mustExecSQL(c, se1, `delete from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`) mustExecSQL(c, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s";`, diff --git a/cmd/benchkv/main.go b/cmd/benchkv/main.go index 041336e2a2..60d8165779 100644 --- a/cmd/benchkv/main.go +++ b/cmd/benchkv/main.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/terror" "github.com/prometheus/client_golang/prometheus" + goctx "golang.org/x/net/context" ) var ( @@ -102,7 +103,7 @@ func batchRW(value []byte) { key := fmt.Sprintf("key_%d", k) err = txn.Set([]byte(key), value) terror.Log(errors.Trace(err)) - err = txn.Commit() + err = txn.Commit(goctx.Background()) if err != nil { txnRolledbackCounter.WithLabelValues("txn").Inc() terror.Call(txn.Rollback) diff --git a/ddl/column_change_test.go b/ddl/column_change_test.go index 727224f8ed..fd42c8b0d0 100644 --- a/ddl/column_change_test.go +++ b/ddl/column_change_test.go @@ -68,7 +68,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) { row := types.MakeDatums(1, 2) h, err := originTable.AddRecord(ctx, row) c.Assert(err, IsNil) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) var mu sync.Mutex @@ -120,7 +120,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) { } mu.Unlock() } - err = hookCtx.Txn().Commit() + err = hookCtx.Txn().Commit(goctx.Background()) if err != nil { checkErr = errors.Trace(err) } @@ -172,7 +172,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx context.Context checkErr = errors.Trace(err) } } - err = hookCtx.Txn().Commit() + err = hookCtx.Txn().Commit(ctx.GoCtx()) if err != nil { checkErr = errors.Trace(err) } diff --git a/ddl/column_test.go b/ddl/column_test.go index 3f8859ba68..722e04eac6 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -261,7 +261,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx context.Context, t table.Table, if err != nil { return errors.Trace(err) } - defer ctx.Txn().Commit() + defer ctx.Txn().Commit(goctx.Background()) key := t.RecordKey(handle) data, err := ctx.Txn().Get(key) if !isExist { @@ -751,7 +751,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) { handle, err := t.AddRecord(ctx, oldRow) c.Assert(err, IsNil) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) newColName := "c4" @@ -814,7 +814,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) d.Stop() @@ -839,7 +839,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) { _, err = t.AddRecord(ctx, append(row, types.NewDatum(defaultColValue))) c.Assert(err, IsNil) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) checkOK := false @@ -888,7 +888,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) d.Stop() diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 8b77d616e2..37d0651816 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/testutil" "github.com/pingcap/tidb/util/types" + goctx "golang.org/x/net/context" ) const ( @@ -343,7 +344,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) { checkErr = errors.Trace(errs[0]) return } - err = hookCtx.Txn().Commit() + err = hookCtx.Txn().Commit(goctx.Background()) if err != nil { checkErr = errors.Trace(err) } diff --git a/ddl/ddl_worker_test.go b/ddl/ddl_worker_test.go index 389b673733..acd364a166 100644 --- a/ddl/ddl_worker_test.go +++ b/ddl/ddl_worker_test.go @@ -324,7 +324,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { row := types.MakeDatums(1, 2) _, err = originTable.AddRecord(ctx, row) c.Assert(err, IsNil) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} @@ -346,7 +346,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { return } checkCancelState(hookCtx.Txn(), job, test) - err = hookCtx.Txn().Commit() + err = hookCtx.Txn().Commit(goctx.Background()) if err != nil { checkErr = errors.Trace(err) return @@ -374,7 +374,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) { test = &tests[3] testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2") c.Check(errors.ErrorStack(checkErr), Equals, "") - c.Assert(ctx.Txn().Commit(), IsNil) + c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil) // for dropping index idxName := []interface{}{model.NewCIStr("idx")} diff --git a/ddl/foreign_key_test.go b/ddl/foreign_key_test.go index e5acf701f7..47b94e1dbd 100644 --- a/ddl/foreign_key_test.go +++ b/ddl/foreign_key_test.go @@ -126,7 +126,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) // fix data race @@ -160,7 +160,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { job := s.testCreateForeignKey(c, tblInfo, "c1_fk", []string{"c1"}, "t2", []string{"c1"}, ast.ReferOptionCascade, ast.ReferOptionSetNull) testCheckJobDone(c, d, job, true) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) mu.Lock() hErr := hookErr @@ -218,6 +218,6 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) { job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) testCheckJobDone(c, d, job, false) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) } diff --git a/ddl/index_change_test.go b/ddl/index_change_test.go index dd19d85997..bd5507da28 100644 --- a/ddl/index_change_test.go +++ b/ddl/index_change_test.go @@ -69,7 +69,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { _, err = originTable.AddRecord(ctx, types.MakeDatums(3, 3)) c.Assert(err, IsNil) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} @@ -120,7 +120,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) { d.SetHook(tc) testCreateIndex(c, ctx, d, s.dbInfo, originTable.Meta(), false, "c2", "c2") c.Check(errors.ErrorStack(checkErr), Equals, "") - c.Assert(ctx.Txn().Commit(), IsNil) + c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil) d.Stop() prevState = model.StateNone var noneTable table.Table @@ -317,7 +317,7 @@ func (s *testIndexChangeSuite) checkAddPublic(d *ddl, ctx context.Context, write return errors.Trace(err) } } - return ctx.Txn().Commit() + return ctx.Txn().Commit(goctx.Background()) } func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx context.Context, publicTbl, writeTbl table.Table) error { @@ -357,7 +357,7 @@ func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx context.Context, p if err != nil { return errors.Trace(err) } - return ctx.Txn().Commit() + return ctx.Txn().Commit(goctx.Background()) } func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx context.Context, writeTbl, delTbl table.Table) error { @@ -402,5 +402,5 @@ func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx context.Context, if err != nil { return errors.Trace(err) } - return ctx.Txn().Commit() + return ctx.Txn().Commit(goctx.Background()) } diff --git a/ddl/reorg_test.go b/ddl/reorg_test.go index 8f17c095a4..e858df42fe 100644 --- a/ddl/reorg_test.go +++ b/ddl/reorg_test.go @@ -58,7 +58,7 @@ func (s *testDDLSuite) TestReorg(c *C) { err = ctx.NewTxn() c.Assert(err, IsNil) ctx.Txn().Set([]byte("a"), []byte("b")) - err = ctx.Txn().Commit() + err = ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) rowCount := int64(10) @@ -150,7 +150,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) { c.Assert(err, IsNil) } - err := ctx.Txn().Commit() + err := ctx.Txn().Commit(goctx.Background()) c.Assert(err, IsNil) tc := &TestDDLCallback{} diff --git a/executor/executor_test.go b/executor/executor_test.go index f7fafba026..95b56f800b 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -184,7 +184,7 @@ func (s *testSuite) TestAdmin(c *C) { c.Assert(tb.Indices(), HasLen, 1) _, err = tb.Indices()[0].Create(txn, types.MakeDatums(int64(10)), 1) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) r, err = tk.Exec("admin check table admin_test") c.Assert(err, NotNil) @@ -222,7 +222,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo, c.Assert(data, DeepEquals, tt.restData, Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data))) } - err1 = ctx.Txn().Commit() + err1 = ctx.Txn().Commit(goctx.Background()) c.Assert(err1, IsNil) r := tk.MustQuery(selectSQL) r.Check(testutil.RowsWithSep("|", tt.expected...)) diff --git a/executor/simple.go b/executor/simple.go index 82404a2ef8..ef948e4a78 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -211,7 +211,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit() + err := e.ctx.Txn().Commit(e.ctx.GoCtx()) if err != nil { return errors.Trace(err) } @@ -242,7 +242,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit() + err := e.ctx.Txn().Commit(e.ctx.GoCtx()) if err != nil { return errors.Trace(err) } diff --git a/kv/fault_injection.go b/kv/fault_injection.go index c6aafe9f5e..aab0323013 100644 --- a/kv/fault_injection.go +++ b/kv/fault_injection.go @@ -13,7 +13,11 @@ package kv -import "sync" +import ( + "sync" + + goctx "golang.org/x/net/context" +) // InjectionConfig is used for fault injections for KV components. type InjectionConfig struct { @@ -95,13 +99,13 @@ func (t *InjectedTransaction) Get(k Key) ([]byte, error) { } // Commit returns an error if cfg.commitError is set. -func (t *InjectedTransaction) Commit() error { +func (t *InjectedTransaction) Commit(ctx goctx.Context) error { t.cfg.RLock() defer t.cfg.RUnlock() if t.cfg.commitError != nil { return t.cfg.commitError } - return t.Transaction.Commit() + return t.Transaction.Commit(ctx) } // InjectedSnapshot wraps a Snapshot with injections. diff --git a/kv/kv.go b/kv/kv.go index cb3d05714e..7e38fa379d 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -116,7 +116,7 @@ type MemBuffer interface { type Transaction interface { MemBuffer // Commit commits the transaction operations to KV store. - Commit() error + Commit(goctx.Context) error // Rollback undoes the transaction operations to KV store. Rollback() error // String implements fmt.Stringer interface. diff --git a/kv/mock.go b/kv/mock.go index 85e3c01897..4d554d9e7b 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -16,6 +16,7 @@ package kv import ( "github.com/juju/errors" "github.com/pingcap/tidb/store/tikv/oracle" + goctx "golang.org/x/net/context" ) // mockTxn is a txn that returns a retryAble error when called Commit. @@ -25,7 +26,7 @@ type mockTxn struct { } // Always returns a retryable error. -func (t *mockTxn) Commit() error { +func (t *mockTxn) Commit(ctx goctx.Context) error { return ErrRetryable } diff --git a/kv/mock_test.go b/kv/mock_test.go index 273ee88624..047cf2749c 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -15,6 +15,7 @@ package kv import ( . "github.com/pingcap/check" + goctx "golang.org/x/net/context" ) var _ = Suite(testMockSuite{}) @@ -48,7 +49,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction.Seek(Key("lock")) transaction.SeekReverse(Key("lock")) } - transaction.Commit() + transaction.Commit(goctx.Background()) transaction, err = storage.Begin() c.Check(err, IsNil) diff --git a/kv/txn.go b/kv/txn.go index 0ab90d0112..14b17da310 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -21,6 +21,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/juju/errors" "github.com/pingcap/tidb/terror" + goctx "golang.org/x/net/context" ) // RunInNewTxn will run the f in a new transaction environment. @@ -54,7 +55,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e return errors.Trace(err) } - err = txn.Commit() + err = txn.Commit(goctx.Background()) if retryable && IsRetryableError(err) { log.Warnf("[kv] Retry txn %v original txn %v err %v", txn, originalTxnTS, err) err1 := txn.Rollback() diff --git a/meta/meta_test.go b/meta/meta_test.go index de2f019731..c5a616c5d9 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/goleveldb" "github.com/pingcap/tidb/util/testleak" + goctx "golang.org/x/net/context" ) func TestT(t *testing.T) { @@ -233,7 +234,7 @@ func (s *testSuite) TestMeta(c *C) { readDiff, err := t.GetSchemaDiff(schemaDiff.Version) c.Assert(readDiff, DeepEquals, schemaDiff) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -249,7 +250,7 @@ func (s *testSuite) TestSnapshot(c *C) { m.GenGlobalID() n, _ := m.GetGlobalID() c.Assert(n, Equals, int64(1)) - txn.Commit() + txn.Commit(goctx.Background()) ver1, _ := store.CurrentVersion() time.Sleep(time.Millisecond) @@ -258,7 +259,7 @@ func (s *testSuite) TestSnapshot(c *C) { m.GenGlobalID() n, _ = m.GetGlobalID() c.Assert(n, Equals, int64(2)) - txn.Commit() + txn.Commit(goctx.Background()) snapshot, _ := store.GetSnapshot(ver1) snapMeta := meta.NewSnapshotMeta(snapshot) @@ -327,6 +328,6 @@ func (s *testSuite) TestDDL(c *C) { lastID = job.ID } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } diff --git a/server/conn.go b/server/conn.go index 62810c519d..344614b47b 100644 --- a/server/conn.go +++ b/server/conn.go @@ -720,7 +720,7 @@ func (cc *clientConn) handleLoadData(loadDataInfo *executor.LoadDataInfo) error } return errors.Trace(err) } - return errors.Trace(txn.Commit()) + return errors.Trace(txn.Commit(loadDataInfo.Ctx.GoCtx())) } // handleQuery executes the sql query string and writes result set or result ok to the client. diff --git a/session.go b/session.go index 4032490ea8..52f5ce37df 100644 --- a/session.go +++ b/session.go @@ -292,7 +292,7 @@ func (s *session) doCommit() error { schemaVer: s.sessionVars.TxnCtx.SchemaVersion, relatedTableIDs: tableIDs, }) - if err := s.txn.Commit(); err != nil { + if err := s.txn.Commit(s.GoCtx()); err != nil { return errors.Trace(err) } return nil diff --git a/store/localstore/compactor_test.go b/store/localstore/compactor_test.go index 94590f9e3e..530443652d 100644 --- a/store/localstore/compactor_test.go +++ b/store/localstore/compactor_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore/engine" "github.com/pingcap/tidb/util/testleak" + goctx "golang.org/x/net/context" ) var _ = Suite(&testLocalstoreCompactorSuite{}) @@ -60,22 +61,22 @@ func (s *testLocalstoreCompactorSuite) TestCompactor(c *C) { txn, _ := store.Begin() txn.Set([]byte("a"), []byte("1")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("a"), []byte("2")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("a"), []byte("3")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("a"), []byte("3")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("a"), []byte("4")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("a"), []byte("5")) - txn.Commit() + txn.Commit(goctx.Background()) t := count(db) c.Assert(t, Equals, 6) @@ -84,7 +85,7 @@ func (s *testLocalstoreCompactorSuite) TestCompactor(c *C) { // Touch a, tigger GC txn, _ = store.Begin() txn.Set([]byte("a"), []byte("b")) - txn.Commit() + txn.Commit(goctx.Background()) time.Sleep(1 * time.Second) // Do background GC t = count(db) @@ -100,16 +101,16 @@ func (s *testLocalstoreCompactorSuite) TestGetAllVersions(c *C) { compactor := store.(*dbStore).compactor txn, _ := store.Begin() txn.Set([]byte("a"), []byte("1")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("a"), []byte("2")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("b"), []byte("1")) - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = store.Begin() txn.Set([]byte("b"), []byte("2")) - txn.Commit() + txn.Commit(goctx.Background()) keys, err := compactor.getAllVersions([]byte("a")) c.Assert(err, IsNil) diff --git a/store/localstore/mvcc_test.go b/store/localstore/mvcc_test.go index d3375b3405..fbf2579b6d 100644 --- a/store/localstore/mvcc_test.go +++ b/store/localstore/mvcc_test.go @@ -21,6 +21,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore/goleveldb" + goctx "golang.org/x/net/context" ) var _ = Suite(&testMvccSuite{}) @@ -82,7 +83,7 @@ func (t *testMvccSuite) SetUpTest(c *C) { err := txn.Set(val, val) c.Assert(err, IsNil) } - txn.Commit() + txn.Commit(goctx.Background()) } func (t *testMvccSuite) TearDownSuite(c *C) { @@ -99,7 +100,7 @@ func (t *testMvccSuite) TestMvccGet(c *C) { k = encodeInt(1024) _, err = txn.Get(k) c.Assert(err, NotNil) - txn.Commit() + txn.Commit(goctx.Background()) } func (t *testMvccSuite) TestMvccPutAndDel(c *C) { @@ -111,7 +112,7 @@ func (t *testMvccSuite) TestMvccPutAndDel(c *C) { err = txn.Delete(val) c.Assert(err, IsNil) } - txn.Commit() + txn.Commit(goctx.Background()) txn, _ = t.s.Begin() _, err = txn.Get(encodeInt(0)) @@ -119,7 +120,7 @@ func (t *testMvccSuite) TestMvccPutAndDel(c *C) { v, err := txn.Get(encodeInt(4)) c.Assert(err, IsNil) c.Assert(len(v), Greater, 0) - txn.Commit() + txn.Commit(goctx.Background()) cnt := 0 t.scanRawEngine(c, func(k, v []byte) { @@ -129,7 +130,7 @@ func (t *testMvccSuite) TestMvccPutAndDel(c *C) { txn.Set(encodeInt(0), []byte("v")) _, err = txn.Get(encodeInt(0)) c.Assert(err, IsNil) - txn.Commit() + txn.Commit(goctx.Background()) cnt1 := 0 t.scanRawEngine(c, func(k, v []byte) { @@ -147,7 +148,7 @@ func (t *testMvccSuite) TestMvccNext(c *C) { err = it.Next() c.Assert(err, IsNil) } - txn.Commit() + txn.Commit(goctx.Background()) } func encodeTestDataKey(i int) []byte { @@ -158,7 +159,7 @@ func (t *testMvccSuite) TestSnapshotGet(c *C) { tx, _ := t.s.Begin() b, err := tx.Get(encodeInt(1)) c.Assert(err, IsNil) - tx.Commit() + tx.Commit(goctx.Background()) lastVer, err := globalVersionProvider.CurrentVersion() c.Assert(err, IsNil) @@ -166,7 +167,7 @@ func (t *testMvccSuite) TestSnapshotGet(c *C) { tx, _ = t.s.Begin() err = tx.Set(encodeInt(1), []byte("new")) c.Assert(err, IsNil) - err = tx.Commit() + err = tx.Commit(goctx.Background()) c.Assert(err, IsNil) testKey := encodeTestDataKey(1) @@ -222,7 +223,7 @@ func (t *testMvccSuite) TestMvccSeek(c *C) { c.Assert(err, IsNil) err = txn.Set(encodeInt(3), encodeInt(1003)) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) v1, err := globalVersionProvider.CurrentVersion() c.Assert(err, IsNil) @@ -231,7 +232,7 @@ func (t *testMvccSuite) TestMvccSeek(c *C) { c.Assert(err, IsNil) err = txn.Delete(encodeInt(2)) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) v2, err := globalVersionProvider.CurrentVersion() c.Assert(err, IsNil) @@ -274,7 +275,7 @@ func (t *testMvccSuite) TestReverseMvccSeek(c *C) { c.Assert(err, IsNil) err = txn.Set(encodeInt(3), encodeInt(1003)) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) v1, err := globalVersionProvider.CurrentVersion() c.Assert(err, IsNil) @@ -283,7 +284,7 @@ func (t *testMvccSuite) TestReverseMvccSeek(c *C) { c.Assert(err, IsNil) err = txn.Delete(encodeInt(4)) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) v2, err := globalVersionProvider.CurrentVersion() c.Assert(err, IsNil) @@ -313,7 +314,7 @@ func (t *testMvccSuite) TestMvccSuiteGetLatest(c *C) { tx, _ := t.s.Begin() err := tx.Set(encodeInt(5), encodeInt(100+i)) c.Assert(err, IsNil) - err = tx.Commit() + err = tx.Commit(goctx.Background()) c.Assert(err, IsNil) } // we can always read newest data @@ -326,24 +327,24 @@ func (t *testMvccSuite) TestMvccSuiteGetLatest(c *C) { c.Assert(err, IsNil) c.Assert(it.Valid(), IsTrue) c.Assert(string(it.Value()), Equals, string(encodeInt(100+9))) - tx.Commit() + tx.Commit(goctx.Background()) testKey := []byte("testKey") txn0, _ := t.s.Begin() txn0.Set(testKey, []byte("0")) - txn0.Commit() + txn0.Commit(goctx.Background()) txn1, _ := t.s.Begin() { // Commit another version txn2, _ := t.s.Begin() txn2.Set(testKey, []byte("2")) - txn2.Commit() + txn2.Commit(goctx.Background()) } r, err := txn1.Get(testKey) c.Assert(err, IsNil) // Test isolation in transaction. c.Assert(string(r), Equals, "0") - txn1.Commit() + txn1.Commit(goctx.Background()) } func (t *testMvccSuite) TestBufferedIterator(c *C) { @@ -355,7 +356,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { tx.Set([]byte{0x0, 0xee, 0xff}, []byte("4")) tx.Set([]byte{0xff, 0xff, 0xee, 0xff}, []byte("5")) tx.Set([]byte{0xff, 0xff, 0xff}, []byte("6")) - tx.Commit() + tx.Commit(goctx.Background()) tx, _ = s.Begin() iter, err := tx.Seek([]byte{0}) @@ -366,7 +367,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { c.Assert(err, IsNil) cnt++ } - tx.Commit() + tx.Commit(goctx.Background()) c.Assert(cnt, Equals, 6) tx, _ = s.Begin() @@ -374,7 +375,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { c.Assert(err, IsNil) c.Assert(it.Valid(), IsTrue) c.Assert(string(it.Key()), Equals, "\xff\xff\xee\xff") - tx.Commit() + tx.Commit(goctx.Background()) // no such key tx, _ = s.Begin() @@ -386,7 +387,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { c.Assert(err, IsNil) c.Assert(it.Valid(), IsTrue) c.Assert(it.Value(), DeepEquals, []byte("2")) - tx.Commit() + tx.Commit(goctx.Background()) tx, _ = s.Begin() iter, err = tx.SeekReverse(nil) @@ -397,7 +398,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { c.Assert(err, IsNil) cnt++ } - tx.Commit() + tx.Commit(goctx.Background()) c.Assert(cnt, Equals, 6) tx, _ = s.Begin() @@ -405,7 +406,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { c.Assert(err, IsNil) c.Assert(it.Valid(), IsTrue) c.Assert(string(it.Key()), Equals, "\xff\xff\xee\xff") - tx.Commit() + tx.Commit(goctx.Background()) // no such key tx, _ = s.Begin() @@ -417,7 +418,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) { c.Assert(err, IsNil) c.Assert(it.Valid(), IsTrue) c.Assert(it.Value(), DeepEquals, []byte("1")) - tx.Commit() + tx.Commit(goctx.Background()) } func encodeInt(n int) []byte { diff --git a/store/localstore/txn.go b/store/localstore/txn.go index 8e1104582b..87752db9f9 100644 --- a/store/localstore/txn.go +++ b/store/localstore/txn.go @@ -20,6 +20,7 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/terror" + goctx "golang.org/x/net/context" ) var ( @@ -115,7 +116,7 @@ func (txn *dbTxn) doCommit() error { return txn.store.CommitTxn(txn) } -func (txn *dbTxn) Commit() error { +func (txn *dbTxn) Commit(goctx.Context) error { if !txn.valid { return errors.Trace(kv.ErrInvalidTxn) } diff --git a/store/localstore/xapi_test.go b/store/localstore/xapi_test.go index b9e897285c..cb560e87c2 100644 --- a/store/localstore/xapi_test.go +++ b/store/localstore/xapi_test.go @@ -86,7 +86,7 @@ func (s *testXAPISuite) TestSelect(c *C) { c.Assert([]byte(chunk.RowsData[dataOffset:dataOffset+rowMeta.Length]), BytesEquals, expectedEncoded) dataOffset += rowMeta.Length } - txn.Commit() + txn.Commit(goctx.Background()) // Select Index request. txn, err = store.Begin() @@ -109,7 +109,7 @@ func (s *testXAPISuite) TestSelect(c *C) { for i, h := range handles { c.Assert(h, Equals, i+1) } - txn.Commit() + txn.Commit(goctx.Background()) store.Close() } @@ -187,7 +187,7 @@ func prepareTableData(store kv.Storage, tbl *simpleTableInfo, count int64, gen g for i := int64(1); i <= count; i++ { setRow(txn, i, tbl, gen) } - return txn.Commit() + return txn.Commit(goctx.Background()) } func setRow(txn kv.Transaction, handle int64, tbl *simpleTableInfo, gen genValueFunc) error { diff --git a/store/store_test.go b/store/store_test.go index adf502abae..bfdff6663a 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/store/localstore/goleveldb" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/testleak" + goctx "golang.org/x/net/context" ) var ( @@ -175,12 +176,12 @@ func (s *testKVSuite) TestGetSet(c *C) { mustGet(c, txn) // Check transaction results - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.s.Begin() c.Assert(err, IsNil) - defer txn.Commit() + defer txn.Commit(goctx.Background()) mustGet(c, txn) mustDel(c, txn) @@ -195,12 +196,12 @@ func (s *testKVSuite) TestSeek(c *C) { checkSeek(c, txn) // Check transaction results - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.s.Begin() c.Assert(err, IsNil) - defer txn.Commit() + defer txn.Commit(goctx.Background()) checkSeek(c, txn) mustDel(c, txn) @@ -217,7 +218,7 @@ func (s *testKVSuite) TestInc(c *C) { c.Assert(n, Equals, int64(100)) // Check transaction results - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.s.Begin() @@ -237,7 +238,7 @@ func (s *testKVSuite) TestInc(c *C) { err = txn.Delete(key) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -251,7 +252,7 @@ func (s *testKVSuite) TestDelete(c *C) { mustDel(c, txn) mustNotGet(c, txn) - txn.Commit() + txn.Commit(goctx.Background()) // Try get txn, err = s.s.Begin() @@ -261,20 +262,20 @@ func (s *testKVSuite) TestDelete(c *C) { // Insert again insertData(c, txn) - txn.Commit() + txn.Commit(goctx.Background()) // Delete all txn, err = s.s.Begin() c.Assert(err, IsNil) mustDel(c, txn) - txn.Commit() + txn.Commit(goctx.Background()) txn, err = s.s.Begin() c.Assert(err, IsNil) mustNotGet(c, txn) - txn.Commit() + txn.Commit(goctx.Background()) } func (s *testKVSuite) TestDelete2(c *C) { @@ -286,7 +287,7 @@ func (s *testKVSuite) TestDelete2(c *C) { txn.Set([]byte("DATA_test_tbl_department_record__0000000001_0004"), val) txn.Set([]byte("DATA_test_tbl_department_record__0000000002_0003"), val) txn.Set([]byte("DATA_test_tbl_department_record__0000000002_0004"), val) - txn.Commit() + txn.Commit(goctx.Background()) // Delete all txn, err = s.s.Begin() @@ -300,19 +301,19 @@ func (s *testKVSuite) TestDelete2(c *C) { err = it.Next() c.Assert(err, IsNil) } - txn.Commit() + txn.Commit(goctx.Background()) txn, err = s.s.Begin() c.Assert(err, IsNil) it, _ = txn.Seek([]byte("DATA_test_tbl_department_record__000000000")) c.Assert(it.Valid(), IsFalse) - txn.Commit() + txn.Commit(goctx.Background()) } func (s *testKVSuite) TestSetNil(c *C) { defer testleak.AfterTest(c)() txn, err := s.s.Begin() - defer txn.Commit() + defer txn.Commit(goctx.Background()) c.Assert(err, IsNil) err = txn.Set([]byte("1"), nil) c.Assert(err, NotNil) @@ -323,10 +324,10 @@ func (s *testKVSuite) TestBasicSeek(c *C) { txn, err := s.s.Begin() c.Assert(err, IsNil) txn.Set([]byte("1"), []byte("1")) - txn.Commit() + txn.Commit(goctx.Background()) txn, err = s.s.Begin() c.Assert(err, IsNil) - defer txn.Commit() + defer txn.Commit(goctx.Background()) it, err := txn.Seek([]byte("2")) c.Assert(err, IsNil) @@ -342,10 +343,10 @@ func (s *testKVSuite) TestBasicTable(c *C) { b := []byte(strconv.Itoa(i)) txn.Set(b, b) } - txn.Commit() + txn.Commit(goctx.Background()) txn, err = s.s.Begin() c.Assert(err, IsNil) - defer txn.Commit() + defer txn.Commit(goctx.Background()) err = txn.Set([]byte("1"), []byte("1")) c.Assert(err, IsNil) @@ -402,7 +403,7 @@ func (s *testKVSuite) TestRollback(c *C) { txn, err = s.s.Begin() c.Assert(err, IsNil) - defer txn.Commit() + defer txn.Commit(goctx.Background()) for i := startIndex; i < testCount; i++ { _, err := txn.Get([]byte(strconv.Itoa(i))) @@ -461,7 +462,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) { if err != nil { return } - err = txn.Commit() + err = txn.Commit(goctx.Background()) if err == nil { atomic.AddInt64(&success, 1) } @@ -476,7 +477,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) { c.Assert(err, IsNil) err = txn.Delete(b) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -491,7 +492,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { txn, err := s.s.Begin() c.Assert(err, IsNil) txn.Set(b, b) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) for i := 0; i < cnt; i++ { @@ -502,7 +503,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { txn1, err1 := s.s.Begin() c.Assert(err1, IsNil) txn1.Set(b, []byte("newValue")) - err1 = txn1.Commit() + err1 = txn1.Commit(goctx.Background()) if err1 == nil { atomic.AddInt64(&success, 1) } @@ -516,7 +517,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) { c.Assert(err, IsNil) err = txn.Delete(b) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -526,7 +527,7 @@ func (s *testKVSuite) TestConditionUpdate(c *C) { c.Assert(err, IsNil) txn.Delete([]byte("b")) kv.IncInt64(txn, []byte("a"), 1) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -545,7 +546,7 @@ func (s *testKVSuite) TestDBClose(c *C) { err = txn.Set([]byte("a"), []byte("b")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) ver, err := store.CurrentVersion() @@ -573,7 +574,7 @@ func (s *testKVSuite) TestDBClose(c *C) { err = txn.Set([]byte("a"), []byte("b")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, NotNil) } @@ -649,7 +650,7 @@ func (s *testKVSuite) TestIsolationInc(c *C) { // delete txn, err := s.s.Begin() c.Assert(err, IsNil) - defer txn.Commit() + defer txn.Commit(goctx.Background()) txn.Delete([]byte("key")) } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 46e64b032c..1e28e44b14 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -22,6 +22,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/juju/errors" + "github.com/opentracing/opentracing-go" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx/binloginfo" @@ -541,7 +542,7 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { const maxTxnTimeUse = 590000 // execute executes the two-phase commit protocol. -func (c *twoPhaseCommitter) execute() error { +func (c *twoPhaseCommitter) execute(ctx goctx.Context) error { defer func() { // Always clean up all written keys if the txn does not commit. c.mu.RLock() @@ -561,7 +562,20 @@ func (c *twoPhaseCommitter) execute() error { } }() - ctx := goctx.Background() + span := opentracing.SpanFromContext(ctx) + if span != nil { + span = opentracing.StartSpan("twoPhaseCommit.execute", opentracing.ChildOf(span.Context())) + } else { + // If we lost the trace information, make a new one for 2PC commit. + span = opentracing.StartSpan("twoPhaseCommit.execute") + } + defer span.Finish() + + // I'm not sure is it safe to cancel 2pc commit process at any time, + // So use a new Background() context instead of inherit the ctx, this is by design, + // to avoid the cancel signal from parent context. + ctx = opentracing.ContextWithSpan(goctx.Background(), span) + binlogChan := c.prewriteBinlog() err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), c.keys) if binlogChan != nil { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index c6d276c211..9d18c30db6 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -74,7 +74,7 @@ func (s *testCommitterSuite) mustCommit(c *C, m map[string]string) { err := txn.Set([]byte(k), []byte(v)) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) s.checkValues(c, m) @@ -108,7 +108,7 @@ func (s *testCommitterSuite) TestCommitRollback(c *C) { "c": "c2", }) - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, NotNil) s.checkValues(c, map[string]string{ @@ -192,7 +192,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) { // txn3 writes "c" err = txn3.Set([]byte("c"), []byte("c3")) c.Assert(err, IsNil) - err = txn3.Commit() + err = txn3.Commit(goctx.Background()) c.Assert(err, IsNil) // txn2 writes "a"(PK), "b", "c" on different regions. // "c" will return a retryable error. @@ -203,7 +203,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) { c.Assert(err, IsNil) err = txn2.Set([]byte("c"), []byte("c2")) c.Assert(err, IsNil) - err = txn2.Commit() + err = txn2.Commit(goctx.Background()) c.Assert(err, NotNil) c.Assert(strings.Contains(err.Error(), txnRetryableMark), IsTrue) } @@ -249,7 +249,7 @@ func (s *testCommitterSuite) TestPrewriteCancel(c *C) { // txn2 writes "b" err := txn2.Set([]byte("b"), []byte("b2")) c.Assert(err, IsNil) - err = txn2.Commit() + err = txn2.Commit(goctx.Background()) c.Assert(err, IsNil) // txn1 writes "a"(PK), "b", "c" on different regions. // "b" will return an error and cancel commit. @@ -259,7 +259,7 @@ func (s *testCommitterSuite) TestPrewriteCancel(c *C) { c.Assert(err, IsNil) err = txn1.Set([]byte("c"), []byte("c1")) c.Assert(err, IsNil) - err = txn1.Commit() + err = txn1.Commit(goctx.Background()) c.Assert(err, NotNil) // "c" should be cleaned up in reasonable time. for i := 0; i < 50; i++ { @@ -299,7 +299,7 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) { } // make start ts bigger. txn.startTS = uint64(math.MaxUint64) - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, NotNil) } @@ -325,7 +325,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { txn1 := s.begin(c) err := txn1.Set([]byte("a"), []byte("a1")) c.Assert(err, IsNil) - err = txn1.Commit() + err = txn1.Commit(goctx.Background()) c.Assert(err, IsNil) // check a @@ -342,7 +342,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { err = txn2.Set([]byte("b"), []byte("b2")) c.Assert(err, IsNil) // prewrite:primary a failed, b success - err = txn2.Commit() + err = txn2.Commit(goctx.Background()) c.Assert(err, NotNil) // txn2 failed with a rollback for record a. @@ -368,7 +368,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) { // update data in a new txn, should be success. err = txn.Set([]byte("a"), []byte("a3")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) // check value txn = s.begin(c) @@ -404,7 +404,7 @@ func (s *testCommitterSuite) TestCommitPrimaryRpcErrors(c *C) { t1 := s.begin(c) err := t1.Set([]byte("a"), []byte("a1")) c.Assert(err, IsNil) - err = t1.Commit() + err = t1.Commit(goctx.Background()) c.Assert(err, NotNil) // TODO: refine errors of region cache and rpc, so that every the rpc error // could be easily wrapped to ErrResultUndetermined, but RegionError would not. @@ -431,7 +431,7 @@ func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) { t2 := s.begin(c) err := t2.Set([]byte("b"), []byte("b1")) c.Assert(err, IsNil) - err = t2.Commit() + err = t2.Commit(goctx.Background()) c.Assert(err, NotNil) c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue) } @@ -454,7 +454,7 @@ func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) { t3 := s.begin(c) err := t3.Set([]byte("c"), []byte("c1")) c.Assert(err, IsNil) - err = t3.Commit() + err = t3.Commit(goctx.Background()) c.Assert(err, NotNil) c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue) } @@ -482,7 +482,7 @@ func (s *testCommitterSuite) TestCommitTimeout(c *C) { c.Assert(err, IsNil) err = txn.Set([]byte("c"), []byte("c1")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, NotNil) txn2 := s.begin(c) diff --git a/store/tikv/isolation_test.go b/store/tikv/isolation_test.go index 2a65f1a7ec..60f3ceeffb 100644 --- a/store/tikv/isolation_test.go +++ b/store/tikv/isolation_test.go @@ -24,6 +24,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/terror" + goctx "golang.org/x/net/context" ) // The test suite takes too long under the race detector. @@ -60,7 +61,7 @@ func (s *testIsolationSuite) SetWithRetry(c *C, k, v []byte) writeRecord { err = txn.Set(k, v) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) if err == nil { return writeRecord{ startTS: txn.StartTS(), diff --git a/store/tikv/lock_test.go b/store/tikv/lock_test.go index c84f073c20..60205bfe7c 100644 --- a/store/tikv/lock_test.go +++ b/store/tikv/lock_test.go @@ -82,7 +82,7 @@ func (s *testLockSuite) putKV(c *C, key, value []byte) (uint64, uint64) { c.Assert(err, IsNil) err = txn.Set(key, value) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) return txn.StartTS(), txn.(*tikvTxn).commitTS } @@ -157,7 +157,7 @@ func (s *testLockSuite) TestCleanLock(c *C) { err = txn.Set([]byte{ch}, []byte{ch + 1}) c.Assert(err, IsNil) } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } diff --git a/store/tikv/mock-tikv/cluster_test.go b/store/tikv/mock-tikv/cluster_test.go index 40533bff29..b2c42f719b 100644 --- a/store/tikv/mock-tikv/cluster_test.go +++ b/store/tikv/mock-tikv/cluster_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/types" + goctx "golang.org/x/net/context" ) var _ = Suite(&testClusterSuite{}) @@ -67,7 +68,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) { txn.Set(idxKey, []byte{'0'}) handle++ } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) // Split Table into 10 regions. diff --git a/store/tikv/safepoint_test.go b/store/tikv/safepoint_test.go index a7a19d8b36..ea920f8e8d 100644 --- a/store/tikv/safepoint_test.go +++ b/store/tikv/safepoint_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/terror" + goctx "golang.org/x/net/context" ) type testSafePointSuite struct { @@ -85,7 +86,7 @@ func (s *testSafePointSuite) TestSafePoint(c *C) { err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i)) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) // for txn get diff --git a/store/tikv/scan_mock_test.go b/store/tikv/scan_mock_test.go index b24209bb9b..fb5a7efb41 100644 --- a/store/tikv/scan_mock_test.go +++ b/store/tikv/scan_mock_test.go @@ -16,6 +16,7 @@ package tikv import ( . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" + goctx "golang.org/x/net/context" ) type testScanMockSuite struct { @@ -35,7 +36,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) { err = txn.Set([]byte{ch}, []byte{ch}) c.Assert(err, IsNil) } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = store.Begin() diff --git a/store/tikv/scan_test.go b/store/tikv/scan_test.go index 9765de260a..1cbcab2138 100644 --- a/store/tikv/scan_test.go +++ b/store/tikv/scan_test.go @@ -18,6 +18,7 @@ import ( "time" . "github.com/pingcap/check" + goctx "golang.org/x/net/context" ) type testScanSuite struct { @@ -45,7 +46,7 @@ func (s *testScanSuite) TearDownSuite(c *C) { c.Assert(err, IsNil) scanner.Next() } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) err = s.store.Close() c.Assert(err, IsNil) @@ -64,7 +65,7 @@ func (s *testScanSuite) TestSeek(c *C) { err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i)) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn2 := s.beginTxn(c) diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index b3d3235857..f3cbd0b433 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -20,6 +20,7 @@ import ( log "github.com/Sirupsen/logrus" . "github.com/pingcap/check" "github.com/pingcap/tidb/kv" + goctx "golang.org/x/net/context" ) type testSnapshotSuite struct { @@ -47,7 +48,7 @@ func (s *testSnapshotSuite) TearDownSuite(c *C) { c.Assert(err, IsNil) scanner.Next() } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) err = s.store.Close() c.Assert(err, IsNil) @@ -77,7 +78,7 @@ func (s *testSnapshotSuite) checkAll(keys []kv.Key, c *C) { c.Assert(v, BytesEquals, v2) scan.Next() } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) c.Assert(m, HasLen, cnt) } @@ -88,7 +89,7 @@ func (s *testSnapshotSuite) deleteKeys(keys []kv.Key, c *C) { err := txn.Delete(k) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -101,7 +102,7 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) { err := txn.Set(k, valueBytes(i)) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) keys := makeKeys(rowNum, s.prefix) @@ -119,7 +120,7 @@ func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) { err := txn.Set(k, valueBytes(i)) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) keys := makeKeys(rowNum, s.prefix) diff --git a/store/tikv/split_test.go b/store/tikv/split_test.go index 4044735623..cc324dfa1e 100644 --- a/store/tikv/split_test.go +++ b/store/tikv/split_test.go @@ -18,6 +18,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv/mock-tikv" "golang.org/x/net/context" + goctx "golang.org/x/net/context" ) type testSplitSuite struct { @@ -83,7 +84,7 @@ func (s *testSplitSuite) TestStaleEpoch(c *C) { c.Assert(err, IsNil) err = txn.Set([]byte("c"), []byte("c")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) // Initiate a split and disable the PD client. If it still works, the diff --git a/store/tikv/store_test.go b/store/tikv/store_test.go index ac7a2b48f8..4b038be3a2 100644 --- a/store/tikv/store_test.go +++ b/store/tikv/store_test.go @@ -97,7 +97,7 @@ func (s *testStoreSuite) TestBusyServerKV(c *C) { c.Assert(err, IsNil) err = txn.Set([]byte("key"), []byte("value")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) var wg sync.WaitGroup @@ -362,7 +362,7 @@ func (s *testStoreSuite) TestRequestPriority(c *C) { txn.SetOption(kv.Priority, kv.PriorityHigh) err = txn.Set([]byte("key"), []byte("value")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) // Cover the basic Get request. diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index 5ae439b868..88846231f7 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/codec" + goctx "golang.org/x/net/context" ) var ( @@ -78,7 +79,7 @@ func (s *testTiclientSuite) TearDownSuite(c *C) { c.Assert(err, IsNil) scanner.Next() } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) err = s.store.Close() c.Assert(err, IsNil) @@ -96,7 +97,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { c.Assert(err, IsNil) err = txn.LockKeys(encodeKey(s.prefix, "key")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn = s.beginTxn(c) @@ -107,7 +108,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn = s.beginTxn(c) err = txn.Delete(encodeKey(s.prefix, "key")) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -119,7 +120,7 @@ func (s *testTiclientSuite) TestMultiKeys(c *C) { err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i)) c.Assert(err, IsNil) } - err := txn.Commit() + err := txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn = s.beginTxn(c) @@ -134,7 +135,7 @@ func (s *testTiclientSuite) TestMultiKeys(c *C) { err = txn.Delete(encodeKey(s.prefix, s08d("key", i))) c.Assert(err, IsNil) } - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -149,7 +150,7 @@ func (s *testTiclientSuite) TestLargeRequest(c *C) { txn := s.beginTxn(c) err := txn.Set([]byte("key"), largeValue) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, NotNil) c.Assert(kv.IsRetryableError(err), IsFalse) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index c011729a04..6be17ebec1 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -140,7 +140,7 @@ func (txn *tikvTxn) DelOption(opt kv.Option) { } } -func (txn *tikvTxn) Commit() error { +func (txn *tikvTxn) Commit(ctx goctx.Context) error { if !txn.valid { return kv.ErrInvalidTxn } @@ -162,7 +162,7 @@ func (txn *tikvTxn) Commit() error { if committer == nil { return nil } - err = committer.execute() + err = committer.execute(ctx) if err != nil { committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0) return errors.Trace(err) diff --git a/structure/structure_test.go b/structure/structure_test.go index 3fa64872e4..7d373ce353 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/goleveldb" "github.com/pingcap/tidb/util/testleak" + goctx "golang.org/x/net/context" ) func TestTxStructure(t *testing.T) { @@ -85,7 +86,7 @@ func (s *testTxStructureSuite) TestString(c *C) { c.Assert(err, IsNil) c.Assert(v, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -168,7 +169,7 @@ func (s *testTxStructureSuite) TestList(c *C) { c.Assert(err, IsNil) c.Assert(l, Equals, int64(0)) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -328,7 +329,7 @@ func (s *testTxStructureSuite) TestHash(c *C) { c.Assert(err, IsNil) c.Assert(value, DeepEquals, []byte("2")) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { diff --git a/table/tables/index_test.go b/table/tables/index_test.go index 0e0a9c0d3a..96cebdfff9 100644 --- a/table/tables/index_test.go +++ b/table/tables/index_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/terror" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" + goctx "golang.org/x/net/context" ) var _ = Suite(&testIndexSuite{}) @@ -125,7 +126,7 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(terror.ErrorEqual(err, io.EOF), IsTrue) it.Close() - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) tblInfo = &model.TableInfo{ @@ -175,7 +176,7 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(h, Equals, int64(1)) c.Assert(exist, IsTrue) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) _, err = index.FetchValues(make([]types.Datum, 0)) diff --git a/table/tables/tables_test.go b/table/tables/tables_test.go index ed259d62aa..1b2690500b 100644 --- a/table/tables/tables_test.go +++ b/table/tables/tables_test.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" + goctx "golang.org/x/net/context" ) func TestT(t *testing.T) { @@ -275,7 +276,7 @@ func (ts *testSuite) TestUnsignedPK(c *C) { c.Assert(err, IsNil) c.Assert(len(row), Equals, 2) c.Assert(row[0].Kind(), Equals, types.KindUint64) - c.Assert(ctx.Txn().Commit(), IsNil) + c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil) } func (ts *testSuite) TestIterRecords(c *C) { @@ -298,7 +299,7 @@ func (ts *testSuite) TestIterRecords(c *C) { }) c.Assert(err, IsNil) c.Assert(totalCount, Equals, 2) - c.Assert(ctx.Txn().Commit(), IsNil) + c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil) } func (ts *testSuite) TestTableFromMeta(c *C) { diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index a9e1e9b346..dffd49605b 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/testleak" "github.com/pingcap/tidb/util/types" + goctx "golang.org/x/net/context" ) func TestT(t *testing.T) { @@ -115,7 +116,7 @@ func (s *testSuite) SetUpSuite(c *C) { err = t.CreateTable(s.dbInfo.ID, s.tbInfo) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) } @@ -128,7 +129,7 @@ func (s *testSuite) TearDownSuite(c *C) { c.Assert(err, IsNil) err = t.DropDatabase(s.dbInfo.ID) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) err = s.store.Close() @@ -282,7 +283,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(s.ctx.NewTxn(), IsNil) _, err = tb.AddRecord(s.ctx, types.MakeDatums(1, 10, 11)) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(), IsNil) + c.Assert(s.ctx.Txn().Commit(goctx.Background()), IsNil) record1 := &RecordData{Handle: int64(1), Values: types.MakeDatums(int64(1), int64(10), int64(11))} record2 := &RecordData{Handle: int64(2), Values: types.MakeDatums(int64(2), int64(20), int64(21))} @@ -295,7 +296,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(s.ctx.NewTxn(), IsNil) _, err = tb.AddRecord(s.ctx, record2.Values) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(), IsNil) + c.Assert(s.ctx.Txn().Commit(goctx.Background()), IsNil) txn, err := s.store.Begin() c.Assert(err, IsNil) @@ -337,7 +338,7 @@ func (s *testSuite) TestScan(c *C) { c.Assert(err, IsNil) err = tb.RemoveRecord(s.ctx, 2, record2.Values) c.Assert(err, IsNil) - c.Assert(s.ctx.Txn().Commit(), IsNil) + c.Assert(s.ctx.Txn().Commit(goctx.Background()), IsNil) } func newDiffRetError(prefix string, ra, rb *RecordData) string { @@ -412,7 +413,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { c.Assert(err, IsNil) key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4)) setColValue(c, txn, key, types.NewDatum(int64(40))) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.store.Begin() @@ -430,7 +431,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { c.Assert(err, IsNil) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3)) setColValue(c, txn, key, types.NewDatum(int64(31))) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.store.Begin() @@ -448,7 +449,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { txn.Delete(key) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 5)) setColValue(c, txn, key, types.NewDatum(int64(30))) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.store.Begin() @@ -466,7 +467,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { txn.Delete(key) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3)) setColValue(c, txn, key, types.NewDatum(int64(30))) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.store.Begin() @@ -484,7 +485,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) { c.Assert(err, IsNil) key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4)) setColValue(c, txn, key, types.NewDatum(int64(40))) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) txn, err = s.store.Begin() diff --git a/util/mock/context.go b/util/mock/context.go index fa882bbc80..75a31162d9 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -106,7 +106,7 @@ func (c *Context) NewTxn() error { return errors.New("store is not set") } if c.txn != nil && c.txn.Valid() { - err := c.txn.Commit() + err := c.txn.Commit(c.ctx) if err != nil { return errors.Trace(err) } diff --git a/util/prefix_helper_test.go b/util/prefix_helper_test.go index 425c1bb086..81f9397b7b 100644 --- a/util/prefix_helper_test.go +++ b/util/prefix_helper_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tidb/store/localstore/goleveldb" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/testleak" + goctx "golang.org/x/net/context" ) const ( @@ -116,7 +117,7 @@ func (c *MockContext) CommitTxn() error { if c.txn == nil { return nil } - return c.txn.Commit() + return c.txn.Commit(goctx.Background()) } func (s *testPrefixSuite) TestPrefix(c *C) { @@ -139,7 +140,7 @@ func (s *testPrefixSuite) TestPrefix(c *C) { return true }) c.Assert(err, IsNil) - err = txn.Commit() + err = txn.Commit(goctx.Background()) c.Assert(err, IsNil) }