*: opentracing for two phase commit (#4900)

This commit is contained in:
tiancaiamao
2017-10-26 04:43:45 -05:00
committed by Ewan Chou
parent 27eedda835
commit 37681cf435
43 changed files with 210 additions and 168 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util/auth"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)
var _ = Suite(&testBootstrapSuite{})
@ -195,7 +196,7 @@ func (s *testBootstrapSuite) TestUpgrade(c *C) {
m := meta.NewMeta(txn)
err = m.FinishBootstrap(int64(1))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
mustExecSQL(c, se1, `delete from mysql.TiDB where VARIABLE_NAME="tidb_server_version";`)
mustExecSQL(c, se1, fmt.Sprintf(`delete from mysql.global_variables where VARIABLE_NAME="%s";`,

View File

@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/terror"
"github.com/prometheus/client_golang/prometheus"
goctx "golang.org/x/net/context"
)
var (
@ -102,7 +103,7 @@ func batchRW(value []byte) {
key := fmt.Sprintf("key_%d", k)
err = txn.Set([]byte(key), value)
terror.Log(errors.Trace(err))
err = txn.Commit()
err = txn.Commit(goctx.Background())
if err != nil {
txnRolledbackCounter.WithLabelValues("txn").Inc()
terror.Call(txn.Rollback)

View File

@ -68,7 +68,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
row := types.MakeDatums(1, 2)
h, err := originTable.AddRecord(ctx, row)
c.Assert(err, IsNil)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
var mu sync.Mutex
@ -120,7 +120,7 @@ func (s *testColumnChangeSuite) TestColumnChange(c *C) {
}
mu.Unlock()
}
err = hookCtx.Txn().Commit()
err = hookCtx.Txn().Commit(goctx.Background())
if err != nil {
checkErr = errors.Trace(err)
}
@ -172,7 +172,7 @@ func (s *testColumnChangeSuite) testAddColumnNoDefault(c *C, ctx context.Context
checkErr = errors.Trace(err)
}
}
err = hookCtx.Txn().Commit()
err = hookCtx.Txn().Commit(ctx.GoCtx())
if err != nil {
checkErr = errors.Trace(err)
}

View File

@ -261,7 +261,7 @@ func (s *testColumnSuite) checkColumnKVExist(ctx context.Context, t table.Table,
if err != nil {
return errors.Trace(err)
}
defer ctx.Txn().Commit()
defer ctx.Txn().Commit(goctx.Background())
key := t.RecordKey(handle)
data, err := ctx.Txn().Get(key)
if !isExist {
@ -751,7 +751,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
handle, err := t.AddRecord(ctx, oldRow)
c.Assert(err, IsNil)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
newColName := "c4"
@ -814,7 +814,7 @@ func (s *testColumnSuite) TestAddColumn(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
d.Stop()
@ -839,7 +839,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
_, err = t.AddRecord(ctx, append(row, types.NewDatum(defaultColValue)))
c.Assert(err, IsNil)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
checkOK := false
@ -888,7 +888,7 @@ func (s *testColumnSuite) TestDropColumn(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
d.Stop()

View File

@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/testutil"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)
const (
@ -343,7 +344,7 @@ func (s *testDBSuite) TestCancelAddIndex(c *C) {
checkErr = errors.Trace(errs[0])
return
}
err = hookCtx.Txn().Commit()
err = hookCtx.Txn().Commit(goctx.Background())
if err != nil {
checkErr = errors.Trace(err)
}

View File

@ -324,7 +324,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
row := types.MakeDatums(1, 2)
_, err = originTable.AddRecord(ctx, row)
c.Assert(err, IsNil)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
tc := &TestDDLCallback{}
@ -346,7 +346,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
return
}
checkCancelState(hookCtx.Txn(), job, test)
err = hookCtx.Txn().Commit()
err = hookCtx.Txn().Commit(goctx.Background())
if err != nil {
checkErr = errors.Trace(err)
return
@ -374,7 +374,7 @@ func (s *testDDLSuite) TestCancelJob(c *C) {
test = &tests[3]
testCreateIndex(c, ctx, d, dbInfo, tblInfo, false, "idx", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn().Commit(), IsNil)
c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil)
// for dropping index
idxName := []interface{}{model.NewCIStr("idx")}

View File

@ -126,7 +126,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
// fix data race
@ -160,7 +160,7 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
job := s.testCreateForeignKey(c, tblInfo, "c1_fk", []string{"c1"}, "t2", []string{"c1"}, ast.ReferOptionCascade, ast.ReferOptionSetNull)
testCheckJobDone(c, d, job, true)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
mu.Lock()
hErr := hookErr
@ -218,6 +218,6 @@ func (s *testForeighKeySuite) TestForeignKey(c *C) {
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
}

View File

@ -69,7 +69,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) {
_, err = originTable.AddRecord(ctx, types.MakeDatums(3, 3))
c.Assert(err, IsNil)
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
tc := &TestDDLCallback{}
@ -120,7 +120,7 @@ func (s *testIndexChangeSuite) TestIndexChange(c *C) {
d.SetHook(tc)
testCreateIndex(c, ctx, d, s.dbInfo, originTable.Meta(), false, "c2", "c2")
c.Check(errors.ErrorStack(checkErr), Equals, "")
c.Assert(ctx.Txn().Commit(), IsNil)
c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil)
d.Stop()
prevState = model.StateNone
var noneTable table.Table
@ -317,7 +317,7 @@ func (s *testIndexChangeSuite) checkAddPublic(d *ddl, ctx context.Context, write
return errors.Trace(err)
}
}
return ctx.Txn().Commit()
return ctx.Txn().Commit(goctx.Background())
}
func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx context.Context, publicTbl, writeTbl table.Table) error {
@ -357,7 +357,7 @@ func (s *testIndexChangeSuite) checkDropWriteOnly(d *ddl, ctx context.Context, p
if err != nil {
return errors.Trace(err)
}
return ctx.Txn().Commit()
return ctx.Txn().Commit(goctx.Background())
}
func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx context.Context, writeTbl, delTbl table.Table) error {
@ -402,5 +402,5 @@ func (s *testIndexChangeSuite) checkDropDeleteOnly(d *ddl, ctx context.Context,
if err != nil {
return errors.Trace(err)
}
return ctx.Txn().Commit()
return ctx.Txn().Commit(goctx.Background())
}

View File

@ -58,7 +58,7 @@ func (s *testDDLSuite) TestReorg(c *C) {
err = ctx.NewTxn()
c.Assert(err, IsNil)
ctx.Txn().Set([]byte("a"), []byte("b"))
err = ctx.Txn().Commit()
err = ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
rowCount := int64(10)
@ -150,7 +150,7 @@ func (s *testDDLSuite) TestReorgOwner(c *C) {
c.Assert(err, IsNil)
}
err := ctx.Txn().Commit()
err := ctx.Txn().Commit(goctx.Background())
c.Assert(err, IsNil)
tc := &TestDDLCallback{}

View File

@ -184,7 +184,7 @@ func (s *testSuite) TestAdmin(c *C) {
c.Assert(tb.Indices(), HasLen, 1)
_, err = tb.Indices()[0].Create(txn, types.MakeDatums(int64(10)), 1)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
r, err = tk.Exec("admin check table admin_test")
c.Assert(err, NotNil)
@ -222,7 +222,7 @@ func checkCases(tests []testCase, ld *executor.LoadDataInfo,
c.Assert(data, DeepEquals, tt.restData,
Commentf("data1:%v, data2:%v, data:%v", string(tt.data1), string(tt.data2), string(data)))
}
err1 = ctx.Txn().Commit()
err1 = ctx.Txn().Commit(goctx.Background())
c.Assert(err1, IsNil)
r := tk.MustQuery(selectSQL)
r.Check(testutil.RowsWithSep("|", tt.expected...))

View File

@ -211,7 +211,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit()
err := e.ctx.Txn().Commit(e.ctx.GoCtx())
if err != nil {
return errors.Trace(err)
}
@ -242,7 +242,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit()
err := e.ctx.Txn().Commit(e.ctx.GoCtx())
if err != nil {
return errors.Trace(err)
}

View File

@ -13,7 +13,11 @@
package kv
import "sync"
import (
"sync"
goctx "golang.org/x/net/context"
)
// InjectionConfig is used for fault injections for KV components.
type InjectionConfig struct {
@ -95,13 +99,13 @@ func (t *InjectedTransaction) Get(k Key) ([]byte, error) {
}
// Commit returns an error if cfg.commitError is set.
func (t *InjectedTransaction) Commit() error {
func (t *InjectedTransaction) Commit(ctx goctx.Context) error {
t.cfg.RLock()
defer t.cfg.RUnlock()
if t.cfg.commitError != nil {
return t.cfg.commitError
}
return t.Transaction.Commit()
return t.Transaction.Commit(ctx)
}
// InjectedSnapshot wraps a Snapshot with injections.

View File

@ -116,7 +116,7 @@ type MemBuffer interface {
type Transaction interface {
MemBuffer
// Commit commits the transaction operations to KV store.
Commit() error
Commit(goctx.Context) error
// Rollback undoes the transaction operations to KV store.
Rollback() error
// String implements fmt.Stringer interface.

View File

@ -16,6 +16,7 @@ package kv
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/store/tikv/oracle"
goctx "golang.org/x/net/context"
)
// mockTxn is a txn that returns a retryAble error when called Commit.
@ -25,7 +26,7 @@ type mockTxn struct {
}
// Always returns a retryable error.
func (t *mockTxn) Commit() error {
func (t *mockTxn) Commit(ctx goctx.Context) error {
return ErrRetryable
}

View File

@ -15,6 +15,7 @@ package kv
import (
. "github.com/pingcap/check"
goctx "golang.org/x/net/context"
)
var _ = Suite(testMockSuite{})
@ -48,7 +49,7 @@ func (s testMockSuite) TestInterface(c *C) {
transaction.Seek(Key("lock"))
transaction.SeekReverse(Key("lock"))
}
transaction.Commit()
transaction.Commit(goctx.Background())
transaction, err = storage.Begin()
c.Check(err, IsNil)

View File

@ -21,6 +21,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/juju/errors"
"github.com/pingcap/tidb/terror"
goctx "golang.org/x/net/context"
)
// RunInNewTxn will run the f in a new transaction environment.
@ -54,7 +55,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
return errors.Trace(err)
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
if retryable && IsRetryableError(err) {
log.Warnf("[kv] Retry txn %v original txn %v err %v", txn, originalTxnTS, err)
err1 := txn.Rollback()

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/store/localstore/goleveldb"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)
func TestT(t *testing.T) {
@ -233,7 +234,7 @@ func (s *testSuite) TestMeta(c *C) {
readDiff, err := t.GetSchemaDiff(schemaDiff.Version)
c.Assert(readDiff, DeepEquals, schemaDiff)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -249,7 +250,7 @@ func (s *testSuite) TestSnapshot(c *C) {
m.GenGlobalID()
n, _ := m.GetGlobalID()
c.Assert(n, Equals, int64(1))
txn.Commit()
txn.Commit(goctx.Background())
ver1, _ := store.CurrentVersion()
time.Sleep(time.Millisecond)
@ -258,7 +259,7 @@ func (s *testSuite) TestSnapshot(c *C) {
m.GenGlobalID()
n, _ = m.GetGlobalID()
c.Assert(n, Equals, int64(2))
txn.Commit()
txn.Commit(goctx.Background())
snapshot, _ := store.GetSnapshot(ver1)
snapMeta := meta.NewSnapshotMeta(snapshot)
@ -327,6 +328,6 @@ func (s *testSuite) TestDDL(c *C) {
lastID = job.ID
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}

View File

@ -720,7 +720,7 @@ func (cc *clientConn) handleLoadData(loadDataInfo *executor.LoadDataInfo) error
}
return errors.Trace(err)
}
return errors.Trace(txn.Commit())
return errors.Trace(txn.Commit(loadDataInfo.Ctx.GoCtx()))
}
// handleQuery executes the sql query string and writes result set or result ok to the client.

View File

@ -292,7 +292,7 @@ func (s *session) doCommit() error {
schemaVer: s.sessionVars.TxnCtx.SchemaVersion,
relatedTableIDs: tableIDs,
})
if err := s.txn.Commit(); err != nil {
if err := s.txn.Commit(s.GoCtx()); err != nil {
return errors.Trace(err)
}
return nil

View File

@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore/engine"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)
var _ = Suite(&testLocalstoreCompactorSuite{})
@ -60,22 +61,22 @@ func (s *testLocalstoreCompactorSuite) TestCompactor(c *C) {
txn, _ := store.Begin()
txn.Set([]byte("a"), []byte("1"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("2"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("3"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("3"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("4"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("5"))
txn.Commit()
txn.Commit(goctx.Background())
t := count(db)
c.Assert(t, Equals, 6)
@ -84,7 +85,7 @@ func (s *testLocalstoreCompactorSuite) TestCompactor(c *C) {
// Touch a, tigger GC
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("b"))
txn.Commit()
txn.Commit(goctx.Background())
time.Sleep(1 * time.Second)
// Do background GC
t = count(db)
@ -100,16 +101,16 @@ func (s *testLocalstoreCompactorSuite) TestGetAllVersions(c *C) {
compactor := store.(*dbStore).compactor
txn, _ := store.Begin()
txn.Set([]byte("a"), []byte("1"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("a"), []byte("2"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("b"), []byte("1"))
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = store.Begin()
txn.Set([]byte("b"), []byte("2"))
txn.Commit()
txn.Commit(goctx.Background())
keys, err := compactor.getAllVersions([]byte("a"))
c.Assert(err, IsNil)

View File

@ -21,6 +21,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore/goleveldb"
goctx "golang.org/x/net/context"
)
var _ = Suite(&testMvccSuite{})
@ -82,7 +83,7 @@ func (t *testMvccSuite) SetUpTest(c *C) {
err := txn.Set(val, val)
c.Assert(err, IsNil)
}
txn.Commit()
txn.Commit(goctx.Background())
}
func (t *testMvccSuite) TearDownSuite(c *C) {
@ -99,7 +100,7 @@ func (t *testMvccSuite) TestMvccGet(c *C) {
k = encodeInt(1024)
_, err = txn.Get(k)
c.Assert(err, NotNil)
txn.Commit()
txn.Commit(goctx.Background())
}
func (t *testMvccSuite) TestMvccPutAndDel(c *C) {
@ -111,7 +112,7 @@ func (t *testMvccSuite) TestMvccPutAndDel(c *C) {
err = txn.Delete(val)
c.Assert(err, IsNil)
}
txn.Commit()
txn.Commit(goctx.Background())
txn, _ = t.s.Begin()
_, err = txn.Get(encodeInt(0))
@ -119,7 +120,7 @@ func (t *testMvccSuite) TestMvccPutAndDel(c *C) {
v, err := txn.Get(encodeInt(4))
c.Assert(err, IsNil)
c.Assert(len(v), Greater, 0)
txn.Commit()
txn.Commit(goctx.Background())
cnt := 0
t.scanRawEngine(c, func(k, v []byte) {
@ -129,7 +130,7 @@ func (t *testMvccSuite) TestMvccPutAndDel(c *C) {
txn.Set(encodeInt(0), []byte("v"))
_, err = txn.Get(encodeInt(0))
c.Assert(err, IsNil)
txn.Commit()
txn.Commit(goctx.Background())
cnt1 := 0
t.scanRawEngine(c, func(k, v []byte) {
@ -147,7 +148,7 @@ func (t *testMvccSuite) TestMvccNext(c *C) {
err = it.Next()
c.Assert(err, IsNil)
}
txn.Commit()
txn.Commit(goctx.Background())
}
func encodeTestDataKey(i int) []byte {
@ -158,7 +159,7 @@ func (t *testMvccSuite) TestSnapshotGet(c *C) {
tx, _ := t.s.Begin()
b, err := tx.Get(encodeInt(1))
c.Assert(err, IsNil)
tx.Commit()
tx.Commit(goctx.Background())
lastVer, err := globalVersionProvider.CurrentVersion()
c.Assert(err, IsNil)
@ -166,7 +167,7 @@ func (t *testMvccSuite) TestSnapshotGet(c *C) {
tx, _ = t.s.Begin()
err = tx.Set(encodeInt(1), []byte("new"))
c.Assert(err, IsNil)
err = tx.Commit()
err = tx.Commit(goctx.Background())
c.Assert(err, IsNil)
testKey := encodeTestDataKey(1)
@ -222,7 +223,7 @@ func (t *testMvccSuite) TestMvccSeek(c *C) {
c.Assert(err, IsNil)
err = txn.Set(encodeInt(3), encodeInt(1003))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
v1, err := globalVersionProvider.CurrentVersion()
c.Assert(err, IsNil)
@ -231,7 +232,7 @@ func (t *testMvccSuite) TestMvccSeek(c *C) {
c.Assert(err, IsNil)
err = txn.Delete(encodeInt(2))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
v2, err := globalVersionProvider.CurrentVersion()
c.Assert(err, IsNil)
@ -274,7 +275,7 @@ func (t *testMvccSuite) TestReverseMvccSeek(c *C) {
c.Assert(err, IsNil)
err = txn.Set(encodeInt(3), encodeInt(1003))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
v1, err := globalVersionProvider.CurrentVersion()
c.Assert(err, IsNil)
@ -283,7 +284,7 @@ func (t *testMvccSuite) TestReverseMvccSeek(c *C) {
c.Assert(err, IsNil)
err = txn.Delete(encodeInt(4))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
v2, err := globalVersionProvider.CurrentVersion()
c.Assert(err, IsNil)
@ -313,7 +314,7 @@ func (t *testMvccSuite) TestMvccSuiteGetLatest(c *C) {
tx, _ := t.s.Begin()
err := tx.Set(encodeInt(5), encodeInt(100+i))
c.Assert(err, IsNil)
err = tx.Commit()
err = tx.Commit(goctx.Background())
c.Assert(err, IsNil)
}
// we can always read newest data
@ -326,24 +327,24 @@ func (t *testMvccSuite) TestMvccSuiteGetLatest(c *C) {
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsTrue)
c.Assert(string(it.Value()), Equals, string(encodeInt(100+9)))
tx.Commit()
tx.Commit(goctx.Background())
testKey := []byte("testKey")
txn0, _ := t.s.Begin()
txn0.Set(testKey, []byte("0"))
txn0.Commit()
txn0.Commit(goctx.Background())
txn1, _ := t.s.Begin()
{
// Commit another version
txn2, _ := t.s.Begin()
txn2.Set(testKey, []byte("2"))
txn2.Commit()
txn2.Commit(goctx.Background())
}
r, err := txn1.Get(testKey)
c.Assert(err, IsNil)
// Test isolation in transaction.
c.Assert(string(r), Equals, "0")
txn1.Commit()
txn1.Commit(goctx.Background())
}
func (t *testMvccSuite) TestBufferedIterator(c *C) {
@ -355,7 +356,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
tx.Set([]byte{0x0, 0xee, 0xff}, []byte("4"))
tx.Set([]byte{0xff, 0xff, 0xee, 0xff}, []byte("5"))
tx.Set([]byte{0xff, 0xff, 0xff}, []byte("6"))
tx.Commit()
tx.Commit(goctx.Background())
tx, _ = s.Begin()
iter, err := tx.Seek([]byte{0})
@ -366,7 +367,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
c.Assert(err, IsNil)
cnt++
}
tx.Commit()
tx.Commit(goctx.Background())
c.Assert(cnt, Equals, 6)
tx, _ = s.Begin()
@ -374,7 +375,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsTrue)
c.Assert(string(it.Key()), Equals, "\xff\xff\xee\xff")
tx.Commit()
tx.Commit(goctx.Background())
// no such key
tx, _ = s.Begin()
@ -386,7 +387,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsTrue)
c.Assert(it.Value(), DeepEquals, []byte("2"))
tx.Commit()
tx.Commit(goctx.Background())
tx, _ = s.Begin()
iter, err = tx.SeekReverse(nil)
@ -397,7 +398,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
c.Assert(err, IsNil)
cnt++
}
tx.Commit()
tx.Commit(goctx.Background())
c.Assert(cnt, Equals, 6)
tx, _ = s.Begin()
@ -405,7 +406,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsTrue)
c.Assert(string(it.Key()), Equals, "\xff\xff\xee\xff")
tx.Commit()
tx.Commit(goctx.Background())
// no such key
tx, _ = s.Begin()
@ -417,7 +418,7 @@ func (t *testMvccSuite) TestBufferedIterator(c *C) {
c.Assert(err, IsNil)
c.Assert(it.Valid(), IsTrue)
c.Assert(it.Value(), DeepEquals, []byte("1"))
tx.Commit()
tx.Commit(goctx.Background())
}
func encodeInt(n int) []byte {

View File

@ -20,6 +20,7 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/terror"
goctx "golang.org/x/net/context"
)
var (
@ -115,7 +116,7 @@ func (txn *dbTxn) doCommit() error {
return txn.store.CommitTxn(txn)
}
func (txn *dbTxn) Commit() error {
func (txn *dbTxn) Commit(goctx.Context) error {
if !txn.valid {
return errors.Trace(kv.ErrInvalidTxn)
}

View File

@ -86,7 +86,7 @@ func (s *testXAPISuite) TestSelect(c *C) {
c.Assert([]byte(chunk.RowsData[dataOffset:dataOffset+rowMeta.Length]), BytesEquals, expectedEncoded)
dataOffset += rowMeta.Length
}
txn.Commit()
txn.Commit(goctx.Background())
// Select Index request.
txn, err = store.Begin()
@ -109,7 +109,7 @@ func (s *testXAPISuite) TestSelect(c *C) {
for i, h := range handles {
c.Assert(h, Equals, i+1)
}
txn.Commit()
txn.Commit(goctx.Background())
store.Close()
}
@ -187,7 +187,7 @@ func prepareTableData(store kv.Storage, tbl *simpleTableInfo, count int64, gen g
for i := int64(1); i <= count; i++ {
setRow(txn, i, tbl, gen)
}
return txn.Commit()
return txn.Commit(goctx.Background())
}
func setRow(txn kv.Transaction, handle int64, tbl *simpleTableInfo, gen genValueFunc) error {

View File

@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/store/localstore/goleveldb"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)
var (
@ -175,12 +176,12 @@ func (s *testKVSuite) TestGetSet(c *C) {
mustGet(c, txn)
// Check transaction results
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()
defer txn.Commit(goctx.Background())
mustGet(c, txn)
mustDel(c, txn)
@ -195,12 +196,12 @@ func (s *testKVSuite) TestSeek(c *C) {
checkSeek(c, txn)
// Check transaction results
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()
defer txn.Commit(goctx.Background())
checkSeek(c, txn)
mustDel(c, txn)
@ -217,7 +218,7 @@ func (s *testKVSuite) TestInc(c *C) {
c.Assert(n, Equals, int64(100))
// Check transaction results
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.s.Begin()
@ -237,7 +238,7 @@ func (s *testKVSuite) TestInc(c *C) {
err = txn.Delete(key)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -251,7 +252,7 @@ func (s *testKVSuite) TestDelete(c *C) {
mustDel(c, txn)
mustNotGet(c, txn)
txn.Commit()
txn.Commit(goctx.Background())
// Try get
txn, err = s.s.Begin()
@ -261,20 +262,20 @@ func (s *testKVSuite) TestDelete(c *C) {
// Insert again
insertData(c, txn)
txn.Commit()
txn.Commit(goctx.Background())
// Delete all
txn, err = s.s.Begin()
c.Assert(err, IsNil)
mustDel(c, txn)
txn.Commit()
txn.Commit(goctx.Background())
txn, err = s.s.Begin()
c.Assert(err, IsNil)
mustNotGet(c, txn)
txn.Commit()
txn.Commit(goctx.Background())
}
func (s *testKVSuite) TestDelete2(c *C) {
@ -286,7 +287,7 @@ func (s *testKVSuite) TestDelete2(c *C) {
txn.Set([]byte("DATA_test_tbl_department_record__0000000001_0004"), val)
txn.Set([]byte("DATA_test_tbl_department_record__0000000002_0003"), val)
txn.Set([]byte("DATA_test_tbl_department_record__0000000002_0004"), val)
txn.Commit()
txn.Commit(goctx.Background())
// Delete all
txn, err = s.s.Begin()
@ -300,19 +301,19 @@ func (s *testKVSuite) TestDelete2(c *C) {
err = it.Next()
c.Assert(err, IsNil)
}
txn.Commit()
txn.Commit(goctx.Background())
txn, err = s.s.Begin()
c.Assert(err, IsNil)
it, _ = txn.Seek([]byte("DATA_test_tbl_department_record__000000000"))
c.Assert(it.Valid(), IsFalse)
txn.Commit()
txn.Commit(goctx.Background())
}
func (s *testKVSuite) TestSetNil(c *C) {
defer testleak.AfterTest(c)()
txn, err := s.s.Begin()
defer txn.Commit()
defer txn.Commit(goctx.Background())
c.Assert(err, IsNil)
err = txn.Set([]byte("1"), nil)
c.Assert(err, NotNil)
@ -323,10 +324,10 @@ func (s *testKVSuite) TestBasicSeek(c *C) {
txn, err := s.s.Begin()
c.Assert(err, IsNil)
txn.Set([]byte("1"), []byte("1"))
txn.Commit()
txn.Commit(goctx.Background())
txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()
defer txn.Commit(goctx.Background())
it, err := txn.Seek([]byte("2"))
c.Assert(err, IsNil)
@ -342,10 +343,10 @@ func (s *testKVSuite) TestBasicTable(c *C) {
b := []byte(strconv.Itoa(i))
txn.Set(b, b)
}
txn.Commit()
txn.Commit(goctx.Background())
txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()
defer txn.Commit(goctx.Background())
err = txn.Set([]byte("1"), []byte("1"))
c.Assert(err, IsNil)
@ -402,7 +403,7 @@ func (s *testKVSuite) TestRollback(c *C) {
txn, err = s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()
defer txn.Commit(goctx.Background())
for i := startIndex; i < testCount; i++ {
_, err := txn.Get([]byte(strconv.Itoa(i)))
@ -461,7 +462,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) {
if err != nil {
return
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
if err == nil {
atomic.AddInt64(&success, 1)
}
@ -476,7 +477,7 @@ func (s *testKVSuite) TestConditionIfNotExist(c *C) {
c.Assert(err, IsNil)
err = txn.Delete(b)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -491,7 +492,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) {
txn, err := s.s.Begin()
c.Assert(err, IsNil)
txn.Set(b, b)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
for i := 0; i < cnt; i++ {
@ -502,7 +503,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) {
txn1, err1 := s.s.Begin()
c.Assert(err1, IsNil)
txn1.Set(b, []byte("newValue"))
err1 = txn1.Commit()
err1 = txn1.Commit(goctx.Background())
if err1 == nil {
atomic.AddInt64(&success, 1)
}
@ -516,7 +517,7 @@ func (s *testKVSuite) TestConditionIfEqual(c *C) {
c.Assert(err, IsNil)
err = txn.Delete(b)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -526,7 +527,7 @@ func (s *testKVSuite) TestConditionUpdate(c *C) {
c.Assert(err, IsNil)
txn.Delete([]byte("b"))
kv.IncInt64(txn, []byte("a"), 1)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -545,7 +546,7 @@ func (s *testKVSuite) TestDBClose(c *C) {
err = txn.Set([]byte("a"), []byte("b"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
ver, err := store.CurrentVersion()
@ -573,7 +574,7 @@ func (s *testKVSuite) TestDBClose(c *C) {
err = txn.Set([]byte("a"), []byte("b"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, NotNil)
}
@ -649,7 +650,7 @@ func (s *testKVSuite) TestIsolationInc(c *C) {
// delete
txn, err := s.s.Begin()
c.Assert(err, IsNil)
defer txn.Commit()
defer txn.Commit(goctx.Background())
txn.Delete([]byte("key"))
}

View File

@ -22,6 +22,7 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/juju/errors"
"github.com/opentracing/opentracing-go"
pb "github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx/binloginfo"
@ -541,7 +542,7 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
const maxTxnTimeUse = 590000
// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute() error {
func (c *twoPhaseCommitter) execute(ctx goctx.Context) error {
defer func() {
// Always clean up all written keys if the txn does not commit.
c.mu.RLock()
@ -561,7 +562,20 @@ func (c *twoPhaseCommitter) execute() error {
}
}()
ctx := goctx.Background()
span := opentracing.SpanFromContext(ctx)
if span != nil {
span = opentracing.StartSpan("twoPhaseCommit.execute", opentracing.ChildOf(span.Context()))
} else {
// If we lost the trace information, make a new one for 2PC commit.
span = opentracing.StartSpan("twoPhaseCommit.execute")
}
defer span.Finish()
// I'm not sure is it safe to cancel 2pc commit process at any time,
// So use a new Background() context instead of inherit the ctx, this is by design,
// to avoid the cancel signal from parent context.
ctx = opentracing.ContextWithSpan(goctx.Background(), span)
binlogChan := c.prewriteBinlog()
err := c.prewriteKeys(NewBackoffer(prewriteMaxBackoff, ctx), c.keys)
if binlogChan != nil {

View File

@ -74,7 +74,7 @@ func (s *testCommitterSuite) mustCommit(c *C, m map[string]string) {
err := txn.Set([]byte(k), []byte(v))
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
s.checkValues(c, m)
@ -108,7 +108,7 @@ func (s *testCommitterSuite) TestCommitRollback(c *C) {
"c": "c2",
})
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, NotNil)
s.checkValues(c, map[string]string{
@ -192,7 +192,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) {
// txn3 writes "c"
err = txn3.Set([]byte("c"), []byte("c3"))
c.Assert(err, IsNil)
err = txn3.Commit()
err = txn3.Commit(goctx.Background())
c.Assert(err, IsNil)
// txn2 writes "a"(PK), "b", "c" on different regions.
// "c" will return a retryable error.
@ -203,7 +203,7 @@ func (s *testCommitterSuite) TestContextCancelRetryable(c *C) {
c.Assert(err, IsNil)
err = txn2.Set([]byte("c"), []byte("c2"))
c.Assert(err, IsNil)
err = txn2.Commit()
err = txn2.Commit(goctx.Background())
c.Assert(err, NotNil)
c.Assert(strings.Contains(err.Error(), txnRetryableMark), IsTrue)
}
@ -249,7 +249,7 @@ func (s *testCommitterSuite) TestPrewriteCancel(c *C) {
// txn2 writes "b"
err := txn2.Set([]byte("b"), []byte("b2"))
c.Assert(err, IsNil)
err = txn2.Commit()
err = txn2.Commit(goctx.Background())
c.Assert(err, IsNil)
// txn1 writes "a"(PK), "b", "c" on different regions.
// "b" will return an error and cancel commit.
@ -259,7 +259,7 @@ func (s *testCommitterSuite) TestPrewriteCancel(c *C) {
c.Assert(err, IsNil)
err = txn1.Set([]byte("c"), []byte("c1"))
c.Assert(err, IsNil)
err = txn1.Commit()
err = txn1.Commit(goctx.Background())
c.Assert(err, NotNil)
// "c" should be cleaned up in reasonable time.
for i := 0; i < 50; i++ {
@ -299,7 +299,7 @@ func (s *testCommitterSuite) TestIllegalTso(c *C) {
}
// make start ts bigger.
txn.startTS = uint64(math.MaxUint64)
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, NotNil)
}
@ -325,7 +325,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
txn1 := s.begin(c)
err := txn1.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
err = txn1.Commit()
err = txn1.Commit(goctx.Background())
c.Assert(err, IsNil)
// check a
@ -342,7 +342,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
err = txn2.Set([]byte("b"), []byte("b2"))
c.Assert(err, IsNil)
// prewrite:primary a failed, b success
err = txn2.Commit()
err = txn2.Commit(goctx.Background())
c.Assert(err, NotNil)
// txn2 failed with a rollback for record a.
@ -368,7 +368,7 @@ func (s *testCommitterSuite) TestPrewritePrimaryKeyFailed(c *C) {
// update data in a new txn, should be success.
err = txn.Set([]byte("a"), []byte("a3"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
// check value
txn = s.begin(c)
@ -404,7 +404,7 @@ func (s *testCommitterSuite) TestCommitPrimaryRpcErrors(c *C) {
t1 := s.begin(c)
err := t1.Set([]byte("a"), []byte("a1"))
c.Assert(err, IsNil)
err = t1.Commit()
err = t1.Commit(goctx.Background())
c.Assert(err, NotNil)
// TODO: refine errors of region cache and rpc, so that every the rpc error
// could be easily wrapped to ErrResultUndetermined, but RegionError would not.
@ -431,7 +431,7 @@ func (s *testCommitterSuite) TestCommitPrimaryRegionError(c *C) {
t2 := s.begin(c)
err := t2.Set([]byte("b"), []byte("b1"))
c.Assert(err, IsNil)
err = t2.Commit()
err = t2.Commit(goctx.Background())
c.Assert(err, NotNil)
c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue)
}
@ -454,7 +454,7 @@ func (s *testCommitterSuite) TestCommitPrimaryKeyError(c *C) {
t3 := s.begin(c)
err := t3.Set([]byte("c"), []byte("c1"))
c.Assert(err, IsNil)
err = t3.Commit()
err = t3.Commit(goctx.Background())
c.Assert(err, NotNil)
c.Assert(terror.ErrorNotEqual(err, terror.ErrResultUndetermined), IsTrue)
}
@ -482,7 +482,7 @@ func (s *testCommitterSuite) TestCommitTimeout(c *C) {
c.Assert(err, IsNil)
err = txn.Set([]byte("c"), []byte("c1"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, NotNil)
txn2 := s.begin(c)

View File

@ -24,6 +24,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/terror"
goctx "golang.org/x/net/context"
)
// The test suite takes too long under the race detector.
@ -60,7 +61,7 @@ func (s *testIsolationSuite) SetWithRetry(c *C, k, v []byte) writeRecord {
err = txn.Set(k, v)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
if err == nil {
return writeRecord{
startTS: txn.StartTS(),

View File

@ -82,7 +82,7 @@ func (s *testLockSuite) putKV(c *C, key, value []byte) (uint64, uint64) {
c.Assert(err, IsNil)
err = txn.Set(key, value)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
return txn.StartTS(), txn.(*tikvTxn).commitTS
}
@ -157,7 +157,7 @@ func (s *testLockSuite) TestCleanLock(c *C) {
err = txn.Set([]byte{ch}, []byte{ch + 1})
c.Assert(err, IsNil)
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)
var _ = Suite(&testClusterSuite{})
@ -67,7 +68,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) {
txn.Set(idxKey, []byte{'0'})
handle++
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
// Split Table into 10 regions.

View File

@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/terror"
goctx "golang.org/x/net/context"
)
type testSafePointSuite struct {
@ -85,7 +86,7 @@ func (s *testSafePointSuite) TestSafePoint(c *C) {
err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i))
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
// for txn get

View File

@ -16,6 +16,7 @@ package tikv
import (
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
goctx "golang.org/x/net/context"
)
type testScanMockSuite struct {
@ -35,7 +36,7 @@ func (s *testScanMockSuite) TestScanMultipleRegions(c *C) {
err = txn.Set([]byte{ch}, []byte{ch})
c.Assert(err, IsNil)
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = store.Begin()

View File

@ -18,6 +18,7 @@ import (
"time"
. "github.com/pingcap/check"
goctx "golang.org/x/net/context"
)
type testScanSuite struct {
@ -45,7 +46,7 @@ func (s *testScanSuite) TearDownSuite(c *C) {
c.Assert(err, IsNil)
scanner.Next()
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
err = s.store.Close()
c.Assert(err, IsNil)
@ -64,7 +65,7 @@ func (s *testScanSuite) TestSeek(c *C) {
err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i))
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn2 := s.beginTxn(c)

View File

@ -20,6 +20,7 @@ import (
log "github.com/Sirupsen/logrus"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
goctx "golang.org/x/net/context"
)
type testSnapshotSuite struct {
@ -47,7 +48,7 @@ func (s *testSnapshotSuite) TearDownSuite(c *C) {
c.Assert(err, IsNil)
scanner.Next()
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
err = s.store.Close()
c.Assert(err, IsNil)
@ -77,7 +78,7 @@ func (s *testSnapshotSuite) checkAll(keys []kv.Key, c *C) {
c.Assert(v, BytesEquals, v2)
scan.Next()
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
c.Assert(m, HasLen, cnt)
}
@ -88,7 +89,7 @@ func (s *testSnapshotSuite) deleteKeys(keys []kv.Key, c *C) {
err := txn.Delete(k)
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -101,7 +102,7 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) {
err := txn.Set(k, valueBytes(i))
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
keys := makeKeys(rowNum, s.prefix)
@ -119,7 +120,7 @@ func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) {
err := txn.Set(k, valueBytes(i))
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
keys := makeKeys(rowNum, s.prefix)

View File

@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/mock-tikv"
"golang.org/x/net/context"
goctx "golang.org/x/net/context"
)
type testSplitSuite struct {
@ -83,7 +84,7 @@ func (s *testSplitSuite) TestStaleEpoch(c *C) {
c.Assert(err, IsNil)
err = txn.Set([]byte("c"), []byte("c"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
// Initiate a split and disable the PD client. If it still works, the

View File

@ -97,7 +97,7 @@ func (s *testStoreSuite) TestBusyServerKV(c *C) {
c.Assert(err, IsNil)
err = txn.Set([]byte("key"), []byte("value"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
var wg sync.WaitGroup
@ -362,7 +362,7 @@ func (s *testStoreSuite) TestRequestPriority(c *C) {
txn.SetOption(kv.Priority, kv.PriorityHigh)
err = txn.Set([]byte("key"), []byte("value"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
// Cover the basic Get request.

View File

@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/util/codec"
goctx "golang.org/x/net/context"
)
var (
@ -78,7 +79,7 @@ func (s *testTiclientSuite) TearDownSuite(c *C) {
c.Assert(err, IsNil)
scanner.Next()
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
err = s.store.Close()
c.Assert(err, IsNil)
@ -96,7 +97,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) {
c.Assert(err, IsNil)
err = txn.LockKeys(encodeKey(s.prefix, "key"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn = s.beginTxn(c)
@ -107,7 +108,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) {
txn = s.beginTxn(c)
err = txn.Delete(encodeKey(s.prefix, "key"))
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -119,7 +120,7 @@ func (s *testTiclientSuite) TestMultiKeys(c *C) {
err := txn.Set(encodeKey(s.prefix, s08d("key", i)), valueBytes(i))
c.Assert(err, IsNil)
}
err := txn.Commit()
err := txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn = s.beginTxn(c)
@ -134,7 +135,7 @@ func (s *testTiclientSuite) TestMultiKeys(c *C) {
err = txn.Delete(encodeKey(s.prefix, s08d("key", i)))
c.Assert(err, IsNil)
}
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -149,7 +150,7 @@ func (s *testTiclientSuite) TestLargeRequest(c *C) {
txn := s.beginTxn(c)
err := txn.Set([]byte("key"), largeValue)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, NotNil)
c.Assert(kv.IsRetryableError(err), IsFalse)
}

View File

@ -140,7 +140,7 @@ func (txn *tikvTxn) DelOption(opt kv.Option) {
}
}
func (txn *tikvTxn) Commit() error {
func (txn *tikvTxn) Commit(ctx goctx.Context) error {
if !txn.valid {
return kv.ErrInvalidTxn
}
@ -162,7 +162,7 @@ func (txn *tikvTxn) Commit() error {
if committer == nil {
return nil
}
err = committer.execute()
err = committer.execute(ctx)
if err != nil {
committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
return errors.Trace(err)

View File

@ -21,6 +21,7 @@ import (
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/store/localstore/goleveldb"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)
func TestTxStructure(t *testing.T) {
@ -85,7 +86,7 @@ func (s *testTxStructureSuite) TestString(c *C) {
c.Assert(err, IsNil)
c.Assert(v, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -168,7 +169,7 @@ func (s *testTxStructureSuite) TestList(c *C) {
c.Assert(err, IsNil)
c.Assert(l, Equals, int64(0))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -328,7 +329,7 @@ func (s *testTxStructureSuite) TestHash(c *C) {
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, []byte("2"))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {

View File

@ -24,6 +24,7 @@ import (
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)
var _ = Suite(&testIndexSuite{})
@ -125,7 +126,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
c.Assert(terror.ErrorEqual(err, io.EOF), IsTrue)
it.Close()
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
tblInfo = &model.TableInfo{
@ -175,7 +176,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
c.Assert(h, Equals, int64(1))
c.Assert(exist, IsTrue)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
_, err = index.FetchValues(make([]types.Datum, 0))

View File

@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)
func TestT(t *testing.T) {
@ -275,7 +276,7 @@ func (ts *testSuite) TestUnsignedPK(c *C) {
c.Assert(err, IsNil)
c.Assert(len(row), Equals, 2)
c.Assert(row[0].Kind(), Equals, types.KindUint64)
c.Assert(ctx.Txn().Commit(), IsNil)
c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil)
}
func (ts *testSuite) TestIterRecords(c *C) {
@ -298,7 +299,7 @@ func (ts *testSuite) TestIterRecords(c *C) {
})
c.Assert(err, IsNil)
c.Assert(totalCount, Equals, 2)
c.Assert(ctx.Txn().Commit(), IsNil)
c.Assert(ctx.Txn().Commit(goctx.Background()), IsNil)
}
func (ts *testSuite) TestTableFromMeta(c *C) {

View File

@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tidb/util/types"
goctx "golang.org/x/net/context"
)
func TestT(t *testing.T) {
@ -115,7 +116,7 @@ func (s *testSuite) SetUpSuite(c *C) {
err = t.CreateTable(s.dbInfo.ID, s.tbInfo)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}
@ -128,7 +129,7 @@ func (s *testSuite) TearDownSuite(c *C) {
c.Assert(err, IsNil)
err = t.DropDatabase(s.dbInfo.ID)
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
err = s.store.Close()
@ -282,7 +283,7 @@ func (s *testSuite) TestScan(c *C) {
c.Assert(s.ctx.NewTxn(), IsNil)
_, err = tb.AddRecord(s.ctx, types.MakeDatums(1, 10, 11))
c.Assert(err, IsNil)
c.Assert(s.ctx.Txn().Commit(), IsNil)
c.Assert(s.ctx.Txn().Commit(goctx.Background()), IsNil)
record1 := &RecordData{Handle: int64(1), Values: types.MakeDatums(int64(1), int64(10), int64(11))}
record2 := &RecordData{Handle: int64(2), Values: types.MakeDatums(int64(2), int64(20), int64(21))}
@ -295,7 +296,7 @@ func (s *testSuite) TestScan(c *C) {
c.Assert(s.ctx.NewTxn(), IsNil)
_, err = tb.AddRecord(s.ctx, record2.Values)
c.Assert(err, IsNil)
c.Assert(s.ctx.Txn().Commit(), IsNil)
c.Assert(s.ctx.Txn().Commit(goctx.Background()), IsNil)
txn, err := s.store.Begin()
c.Assert(err, IsNil)
@ -337,7 +338,7 @@ func (s *testSuite) TestScan(c *C) {
c.Assert(err, IsNil)
err = tb.RemoveRecord(s.ctx, 2, record2.Values)
c.Assert(err, IsNil)
c.Assert(s.ctx.Txn().Commit(), IsNil)
c.Assert(s.ctx.Txn().Commit(goctx.Background()), IsNil)
}
func newDiffRetError(prefix string, ra, rb *RecordData) string {
@ -412,7 +413,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
c.Assert(err, IsNil)
key := tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
setColValue(c, txn, key, types.NewDatum(int64(40)))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.store.Begin()
@ -430,7 +431,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
c.Assert(err, IsNil)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
setColValue(c, txn, key, types.NewDatum(int64(31)))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.store.Begin()
@ -448,7 +449,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
txn.Delete(key)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 5))
setColValue(c, txn, key, types.NewDatum(int64(30)))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.store.Begin()
@ -466,7 +467,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
txn.Delete(key)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 3))
setColValue(c, txn, key, types.NewDatum(int64(30)))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.store.Begin()
@ -484,7 +485,7 @@ func (s *testSuite) testIndex(c *C, tb table.Table, idx table.Index) {
c.Assert(err, IsNil)
key = tablecodec.EncodeRowKey(tb.Meta().ID, codec.EncodeInt(nil, 4))
setColValue(c, txn, key, types.NewDatum(int64(40)))
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
txn, err = s.store.Begin()

View File

@ -106,7 +106,7 @@ func (c *Context) NewTxn() error {
return errors.New("store is not set")
}
if c.txn != nil && c.txn.Valid() {
err := c.txn.Commit()
err := c.txn.Commit(c.ctx)
if err != nil {
return errors.Trace(err)
}

View File

@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/store/localstore/goleveldb"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/testleak"
goctx "golang.org/x/net/context"
)
const (
@ -116,7 +117,7 @@ func (c *MockContext) CommitTxn() error {
if c.txn == nil {
return nil
}
return c.txn.Commit()
return c.txn.Commit(goctx.Background())
}
func (s *testPrefixSuite) TestPrefix(c *C) {
@ -139,7 +140,7 @@ func (s *testPrefixSuite) TestPrefix(c *C) {
return true
})
c.Assert(err, IsNil)
err = txn.Commit()
err = txn.Commit(goctx.Background())
c.Assert(err, IsNil)
}