From a3851001f4e6218d4c60b51769cba2e4b3b4a470 Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 29 Oct 2015 17:01:57 +0800 Subject: [PATCH 1/3] structure: remove lock keys in Inc --- structure/string.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/structure/string.go b/structure/string.go index b65282514a..4a4f6e64ec 100644 --- a/structure/string.go +++ b/structure/string.go @@ -56,10 +56,7 @@ func (t *TxStructure) GetInt64(key []byte) (int64, error) { // the value after the increment. func (t *TxStructure) Inc(key []byte, step int64) (int64, error) { ek := t.encodeStringDataKey(key) - if err := t.txn.LockKeys(ek); err != nil { - return 0, errors.Trace(err) - } - + // txn Inc will lock this key, so we don't lock it here. n, err := t.txn.Inc(ek, step) if errors2.ErrorEqual(err, kv.ErrNotExist) { err = nil From 0c9a4e324b7aa7601e00f0b9a3dcaf6e2789db1c Mon Sep 17 00:00:00 2001 From: siddontang Date: Thu, 29 Oct 2015 17:25:05 +0800 Subject: [PATCH 2/3] *: cleanup meta and structure. --- ddl/ddl.go | 41 ++++++++++++++++------ ddl/worker.go | 17 ++++++--- domain/domain.go | 7 ++-- infoschema/infoschema.go | 7 ++-- meta/autoid/autoid.go | 20 ++++++----- meta/autoid/autoid_test.go | 7 ++-- meta/meta.go | 63 +++------------------------------ meta/meta_test.go | 40 ++++++--------------- structure/structure.go | 69 +++---------------------------------- structure/structure_test.go | 36 ++++++++++--------- 10 files changed, 102 insertions(+), 205 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index bccc304226..dfe1b82cfd 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -62,7 +62,7 @@ type DDL interface { type ddl struct { infoHandle *infoschema.Handle onDDLChange OnDDLChange - meta *meta.Meta + store kv.Storage // schema lease seconds. lease int uuid string @@ -78,7 +78,7 @@ func NewDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange, l d := &ddl{ infoHandle: infoHandle, onDDLChange: hook, - meta: meta.NewMeta(store), + store: store, lease: lease, uuid: uuid.NewV4().String(), jobCh: make(chan struct{}, 1), @@ -92,6 +92,19 @@ func (d *ddl) GetInformationSchema() infoschema.InfoSchema { return d.infoHandle.Get() } +func (d *ddl) genGlobalID() (int64, error) { + var globalID int64 + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + + var err1 error + globalID, err1 = t.GenGlobalID() + return errors.Trace(err1) + }) + + return globalID, errors.Trace(err) +} + func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) { is := d.GetInformationSchema() _, ok := is.SchemaByName(schema) @@ -99,12 +112,13 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) return errors.Trace(ErrExists) } info := &model.DBInfo{Name: schema} - info.ID, err = d.meta.GenGlobalID() + info.ID, err = d.genGlobalID() if err != nil { return errors.Trace(err) } - err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) @@ -172,7 +186,8 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) { } } - err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) @@ -276,7 +291,7 @@ func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*c if err != nil { return nil, nil, errors.Trace(err) } - col.ID, err = d.meta.GenGlobalID() + col.ID, err = d.genGlobalID() if err != nil { return nil, nil, errors.Trace(err) } @@ -331,7 +346,7 @@ func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constrai tbInfo = &model.TableInfo{ Name: tableName, } - tbInfo.ID, err = d.meta.GenGlobalID() + tbInfo.ID, err = d.genGlobalID() if err != nil { return nil, errors.Trace(err) } @@ -399,7 +414,8 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col } log.Infof("New table: %+v", tbInfo) - err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) @@ -500,7 +516,8 @@ func (d *ddl) addColumn(ctx context.Context, schema *model.DBInfo, tbl table.Tab } // update infomation schema - err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifySchemaMetaVersion(t, schemaMetaVersion) if err != nil { return errors.Trace(err) @@ -562,7 +579,8 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) { return errors.Trace(err) } - err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) @@ -659,7 +677,8 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde } // update InfoSchema - err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) diff --git a/ddl/worker.go b/ddl/worker.go index 475fb21086..68435c9aeb 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -19,6 +19,7 @@ import ( "github.com/juju/errors" "github.com/ngaut/log" "github.com/pingcap/tidb/context" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/model" ) @@ -30,7 +31,8 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { } // Create a new job and queue it. - err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) var err error job.ID, err = t.GenGlobalID() if err != nil { @@ -78,7 +80,8 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { func (d *ddl) getHistoryJob(id int64) (*model.Job, error) { var job *model.Job - err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) var err1 error job, err1 = t.GetHistoryDDLJob(id) return errors.Trace(err1) @@ -139,7 +142,9 @@ func (d *ddl) updateJob(t *meta.TMeta, job *model.Job) error { func (d *ddl) getFirstJob() (*model.Job, error) { var job *model.Job - err := d.meta.RunInNewTxn(true, func(t *meta.TMeta) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error job, err1 = t.GetDDLJob(0) return errors.Trace(err1) @@ -150,7 +155,8 @@ func (d *ddl) getFirstJob() (*model.Job, error) { func (d *ddl) finishJob(job *model.Job) error { // done, notice and run next job. - err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) err := d.verifyOwner(t) if err != nil { return errors.Trace(err) @@ -169,7 +175,8 @@ func (d *ddl) finishJob(job *model.Job) error { } func (d *ddl) checkOwner() error { - err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error { + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) return errors.Trace(d.verifyOwner(t)) }) diff --git a/domain/domain.go b/domain/domain.go index 082a5a9cc9..0b669fd38c 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -30,11 +30,11 @@ type Domain struct { store kv.Storage infoHandle *infoschema.Handle ddl ddl.DDL - meta *meta.Meta lease int } -func (do *Domain) loadInfoSchema(m *meta.TMeta) (err error) { +func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) { + m := meta.NewMeta(txn) schemaMetaVersion, err := m.GetSchemaVersion() if err != nil { return errors.Trace(err) @@ -90,7 +90,7 @@ func (do *Domain) onDDLChange(err error) error { } func (do *Domain) reload() error { - err := do.meta.RunInNewTxn(false, do.loadInfoSchema) + err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema) return errors.Trace(err) } @@ -116,7 +116,6 @@ func (do *Domain) loadSchemaInLoop() { func NewDomain(store kv.Storage, lease int) (d *Domain, err error) { d = &Domain{ store: store, - meta: meta.NewMeta(store), lease: lease, } diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index 2982e31db4..015600066b 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -18,7 +18,6 @@ import ( "github.com/juju/errors" "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/table" @@ -191,13 +190,13 @@ func (is *infoSchema) Clone() (result []*model.DBInfo) { // Handle handles information schema, including getting and setting. type Handle struct { value atomic.Value - meta *meta.Meta + store kv.Storage } // NewHandle creates a new Handle. func NewHandle(store kv.Storage) *Handle { return &Handle{ - meta: meta.NewMeta(store), + store: store, } } @@ -218,7 +217,7 @@ func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) { info.schemas[di.ID] = di info.schemaNameToID[di.Name.L] = di.ID for _, t := range di.Tables { - alloc := autoid.NewAllocator(h.meta, di.ID) + alloc := autoid.NewAllocator(h.store, di.ID) info.tables[t.ID] = table.TableFromMeta(di.Name.L, alloc, t) tname := tableName{di.Name.L, t.Name.L} info.tableNameToID[tname] = t.ID diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 8f12501fe0..d08466ff65 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -18,6 +18,7 @@ import ( "github.com/juju/errors" "github.com/ngaut/log" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" ) @@ -32,11 +33,11 @@ type Allocator interface { } type allocator struct { - mu sync.Mutex - base int64 - end int64 - m *meta.Meta - dbID int64 + mu sync.Mutex + base int64 + end int64 + store kv.Storage + dbID int64 } // Alloc allocs the next autoID for table with tableID. @@ -48,7 +49,8 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) { alloc.mu.Lock() defer alloc.mu.Unlock() if alloc.base == alloc.end { // step - err := alloc.m.RunInNewTxn(true, func(m *meta.TMeta) error { + err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error { + m := meta.NewMeta(txn) // err1 is used for passing `go tool vet --shadow` check. end, err1 := m.GenAutoTableID(alloc.dbID, tableID, step) if err1 != nil { @@ -71,9 +73,9 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) { } // NewAllocator returns a new auto increment id generator on the store. -func NewAllocator(m *meta.Meta, dbID int64) Allocator { +func NewAllocator(store kv.Storage, dbID int64) Allocator { return &allocator{ - m: m, - dbID: dbID, + store: store, + dbID: dbID, } } diff --git a/meta/autoid/autoid_test.go b/meta/autoid/autoid_test.go index f38e711950..03dfc03bbc 100644 --- a/meta/autoid/autoid_test.go +++ b/meta/autoid/autoid_test.go @@ -17,6 +17,7 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/model" @@ -39,8 +40,8 @@ func (*testSuite) TestT(c *C) { c.Assert(err, IsNil) defer store.Close() - m := meta.NewMeta(store) - err = m.RunInNewTxn(false, func(m *meta.TMeta) error { + err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error { + m := meta.NewMeta(txn) err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")}) c.Assert(err, IsNil) err = m.CreateTable(1, &model.TableInfo{ID: 1, Name: model.NewCIStr("t")}) @@ -49,7 +50,7 @@ func (*testSuite) TestT(c *C) { }) c.Assert(err, IsNil) - alloc := autoid.NewAllocator(m, 1) + alloc := autoid.NewAllocator(store, 1) c.Assert(alloc, NotNil) id, err := alloc.Alloc(1) diff --git a/meta/meta.go b/meta/meta.go index 1395c719fd..5726bd1eb7 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -66,60 +66,15 @@ var ( ErrTableNotExists = errors.New("table doesn't exist") ) -// Meta is the structure saving meta information. -type Meta struct { - store *structure.TStore -} - // TMeta is for handling meta information in a transaction. type TMeta struct { txn *structure.TxStructure } -// NewMeta creates a Meta with kv storage. -func NewMeta(store kv.Storage) *Meta { - m := &Meta{ - store: structure.NewStore(store, []byte{0x00}), - } - return m -} - -// Begin creates a TMeta object and you can handle meta information in a transaction. -func (m *Meta) Begin() (*TMeta, error) { - t := &TMeta{} - - var err error - t.txn, err = m.store.Begin() - if err != nil { - return nil, errors.Trace(err) - } - return t, nil -} - -// RunInNewTxn runs fn in a new transaction. -func (m *Meta) RunInNewTxn(retryable bool, f func(t *TMeta) error) error { - fn := func(txn *structure.TxStructure) error { - t := &TMeta{txn: txn} - return errors.Trace(f(t)) - } - - err := m.store.RunInNewTxn(retryable, fn) - return errors.Trace(err) -} - -// GenGlobalID generates next id globally. -func (m *Meta) GenGlobalID() (int64, error) { - var ( - id int64 - err error - ) - - err1 := m.RunInNewTxn(true, func(t *TMeta) error { - id, err = t.GenGlobalID() - return errors.Trace(err) - }) - - return id, errors.Trace(err1) +// NewMeta creates a TMeta in transaction txn. +func NewMeta(txn kv.Transaction) *TMeta { + t := structure.NewStructure(txn, []byte{0x00}) + return &TMeta{txn: t} } // GenGlobalID generates next id globally. @@ -556,13 +511,3 @@ func (m *TMeta) GetHistoryDDLJob(id int64) (*model.Job, error) { err = json.Unmarshal(value, job) return job, errors.Trace(err) } - -// Commit commits the transaction. -func (m *TMeta) Commit() error { - return m.txn.Commit() -} - -// Rollback rolls back the transaction. -func (m *TMeta) Rollback() error { - return m.txn.Rollback() -} diff --git a/meta/meta_test.go b/meta/meta_test.go index b50e70b5e5..5202e97e7c 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -38,12 +38,12 @@ func (s *testSuite) TestMeta(c *C) { c.Assert(err, IsNil) defer store.Close() - m := meta.NewMeta(store) - - t, err := m.Begin() + txn, err := store.Begin() c.Assert(err, IsNil) - defer t.Rollback() + defer txn.Rollback() + + t := meta.NewMeta(txn) n, err := t.GenGlobalID() c.Assert(err, IsNil) @@ -146,28 +146,8 @@ func (s *testSuite) TestMeta(c *C) { c.Assert(err, IsNil) c.Assert(dbs, HasLen, 0) - err = t.Commit() + err = txn.Commit() c.Assert(err, IsNil) - - fn := func(m *meta.TMeta) error { - n, err = m.GenSchemaVersion() - c.Assert(err, IsNil) - - var n1 int64 - n1, err = m.GetSchemaVersion() - c.Assert(err, IsNil) - c.Assert(n, Equals, n1) - return nil - } - - err = m.RunInNewTxn(false, fn) - c.Assert(err, IsNil) - - n, err = m.GenGlobalID() - c.Assert(err, IsNil) - n1, err := m.GenGlobalID() - c.Assert(err, IsNil) - c.Assert(n1, Equals, n+1) } func (s *testSuite) TestDDL(c *C) { @@ -176,12 +156,12 @@ func (s *testSuite) TestDDL(c *C) { c.Assert(err, IsNil) defer store.Close() - m := meta.NewMeta(store) - - t, err := m.Begin() + txn, err := store.Begin() c.Assert(err, IsNil) - defer t.Rollback() + defer txn.Rollback() + + t := meta.NewMeta(txn) owner := &model.Owner{OwnerID: "1"} err = t.SetDDLOwner(owner) @@ -220,6 +200,6 @@ func (s *testSuite) TestDDL(c *C) { c.Assert(err, IsNil) c.Assert(v, DeepEquals, job) - err = t.Commit() + err = txn.Commit() c.Assert(err, IsNil) } diff --git a/structure/structure.go b/structure/structure.go index 4f185d663e..61fe269f89 100644 --- a/structure/structure.go +++ b/structure/structure.go @@ -13,78 +13,19 @@ package structure -import ( - "github.com/juju/errors" - "github.com/pingcap/tidb/kv" -) +import "github.com/pingcap/tidb/kv" -// ErrTxDone is the error returned after transaction has already been committed or rolled back. -var ErrTxDone = errors.New("Transaction has already been committed or rolled back") - -// TStore is the storage for data structure. -type TStore struct { - store kv.Storage - prefix []byte -} - -// NewStore creates a TStore with kv storage and special key prefix. -func NewStore(store kv.Storage, prefix []byte) *TStore { - s := &TStore{ - store: store, +// NewStructure creates a TxStructure in transaction txn and with key prefix. +func NewStructure(txn kv.Transaction, prefix []byte) *TxStructure { + return &TxStructure{ + txn: txn, prefix: prefix, } - return s -} - -// Begin creates a TxStructure for calling structure APIs in a transaction later. -func (s *TStore) Begin() (*TxStructure, error) { - t := &TxStructure{done: false, prefix: s.prefix} - - var err error - t.txn, err = s.store.Begin() - if err != nil { - return nil, errors.Trace(err) - } - - return t, nil -} - -// RunInNewTxn runs f in a new transaction -func (s *TStore) RunInNewTxn(retryable bool, f func(t *TxStructure) error) error { - fn := func(txn kv.Transaction) error { - t := &TxStructure{done: false, prefix: s.prefix, txn: txn} - return errors.Trace(f(t)) - } - err := kv.RunInNewTxn(s.store, retryable, fn) - return errors.Trace(err) } // TxStructure supports some simple data structures like string, hash, list, etc... and // you can use these in a transaction. type TxStructure struct { txn kv.Transaction - done bool prefix []byte } - -// Commit commits the transaction. -func (t *TxStructure) Commit() error { - if t.done { - return ErrTxDone - } - - t.done = true - - return errors.Trace(t.txn.Commit()) -} - -// Rollback rolls back the transaction. -func (t *TxStructure) Rollback() error { - if t.done { - return ErrTxDone - } - - t.done = true - - return errors.Trace(t.txn.Rollback()) -} diff --git a/structure/structure_test.go b/structure/structure_test.go index 9a769c259a..3b1afd348d 100644 --- a/structure/structure_test.go +++ b/structure/structure_test.go @@ -17,18 +17,19 @@ import ( "testing" . "github.com/pingcap/check" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/localstore" "github.com/pingcap/tidb/store/localstore/goleveldb" ) -func TesTxStructure(t *testing.T) { +func TestTxStructure(t *testing.T) { TestingT(t) } var _ = Suite(&tesTxStructureSuite{}) type tesTxStructureSuite struct { - s *TStore + store kv.Storage } func (s *tesTxStructureSuite) SetUpSuite(c *C) { @@ -38,19 +39,20 @@ func (s *tesTxStructureSuite) SetUpSuite(c *C) { } store, err := d.Open(path) c.Assert(err, IsNil) - s.s = NewStore(store, []byte{0x00}) + s.store = store } func (s *tesTxStructureSuite) TearDownSuite(c *C) { - err := s.s.store.Close() + err := s.store.Close() c.Assert(err, IsNil) } func (s *tesTxStructureSuite) TestString(c *C) { - tx, err := s.s.Begin() + txn, err := s.store.Begin() c.Assert(err, IsNil) + defer txn.Rollback() - defer tx.Rollback() + tx := NewStructure(txn, []byte{0x00}) key := []byte("a") value := []byte("1") @@ -80,15 +82,16 @@ func (s *tesTxStructureSuite) TestString(c *C) { c.Assert(err, IsNil) c.Assert(v, IsNil) - err = tx.Commit() + err = txn.Commit() c.Assert(err, IsNil) } func (s *tesTxStructureSuite) TestList(c *C) { - tx, err := s.s.Begin() + txn, err := s.store.Begin() c.Assert(err, IsNil) + defer txn.Rollback() - defer tx.Rollback() + tx := NewStructure(txn, []byte{0x00}) key := []byte("a") err = tx.LPush(key, []byte("3"), []byte("2"), []byte("1")) @@ -161,15 +164,16 @@ func (s *tesTxStructureSuite) TestList(c *C) { c.Assert(err, IsNil) c.Assert(l, Equals, int64(0)) - err = tx.Commit() + err = txn.Commit() c.Assert(err, IsNil) } func (s *tesTxStructureSuite) TestHash(c *C) { - tx, err := s.s.Begin() + txn, err := s.store.Begin() c.Assert(err, IsNil) + defer txn.Rollback() - defer tx.Rollback() + tx := NewStructure(txn, []byte{0x00}) key := []byte("a") @@ -242,10 +246,11 @@ func (s *tesTxStructureSuite) TestHash(c *C) { err = tx.HDel(key, []byte("fake_key")) c.Assert(err, IsNil) - err = tx.Commit() + err = txn.Commit() c.Assert(err, IsNil) - fn := func(t *TxStructure) error { + err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error { + t := NewStructure(txn, []byte{0x00}) err = t.Set(key, []byte("abc")) c.Assert(err, IsNil) @@ -253,7 +258,6 @@ func (s *tesTxStructureSuite) TestHash(c *C) { c.Assert(err, IsNil) c.Assert(value, DeepEquals, []byte("abc")) return nil - } - err = s.s.RunInNewTxn(false, fn) + }) c.Assert(err, IsNil) } From e2b0b88589fb3aae5b154f2befe1e60f5b3fb1e9 Mon Sep 17 00:00:00 2001 From: ngaut Date: Thu, 29 Oct 2015 18:23:45 +0800 Subject: [PATCH 3/3] ddl: Rename TMeta to Meta Clean up --- ddl/ddl.go | 10 +++--- ddl/worker.go | 4 +-- meta/meta.go | 92 +++++++++++++++++++++++++++------------------------ 3 files changed, 55 insertions(+), 51 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index dfe1b82cfd..92aab675c5 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -95,11 +95,9 @@ func (d *ddl) GetInformationSchema() infoschema.InfoSchema { func (d *ddl) genGlobalID() (int64, error) { var globalID int64 err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - - var err1 error - globalID, err1 = t.GenGlobalID() - return errors.Trace(err1) + var err error + globalID, err = meta.NewMeta(txn).GenGlobalID() + return errors.Trace(err) }) return globalID, errors.Trace(err) @@ -135,7 +133,7 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) return errors.Trace(err) } -func (d *ddl) verifySchemaMetaVersion(t *meta.TMeta, schemaMetaVersion int64) error { +func (d *ddl) verifySchemaMetaVersion(t *meta.Meta, schemaMetaVersion int64) error { curVer, err := t.GetSchemaVersion() if err != nil { return errors.Trace(err) diff --git a/ddl/worker.go b/ddl/worker.go index 68435c9aeb..22ee779ca4 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -97,7 +97,7 @@ func asyncNotify(ch chan struct{}) { } } -func (d *ddl) verifyOwner(t *meta.TMeta) error { +func (d *ddl) verifyOwner(t *meta.Meta) error { owner, err := t.GetDDLOwner() if err != nil { return errors.Trace(err) @@ -129,7 +129,7 @@ func (d *ddl) verifyOwner(t *meta.TMeta) error { } // every time we enter another state, we must call this function. -func (d *ddl) updateJob(t *meta.TMeta, job *model.Job) error { +func (d *ddl) updateJob(t *meta.Meta, job *model.Job) error { err := d.verifyOwner(t) if err != nil { return errors.Trace(err) diff --git a/meta/meta.go b/meta/meta.go index 5726bd1eb7..c10bf9052e 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -66,19 +66,19 @@ var ( ErrTableNotExists = errors.New("table doesn't exist") ) -// TMeta is for handling meta information in a transaction. -type TMeta struct { +// Meta is for handling meta information in a transaction. +type Meta struct { txn *structure.TxStructure } -// NewMeta creates a TMeta in transaction txn. -func NewMeta(txn kv.Transaction) *TMeta { +// NewMeta creates a Meta in transaction txn. +func NewMeta(txn kv.Transaction) *Meta { t := structure.NewStructure(txn, []byte{0x00}) - return &TMeta{txn: t} + return &Meta{txn: t} } // GenGlobalID generates next id globally. -func (m *TMeta) GenGlobalID() (int64, error) { +func (m *Meta) GenGlobalID() (int64, error) { globalIDMutex.Lock() defer globalIDMutex.Unlock() @@ -86,15 +86,15 @@ func (m *TMeta) GenGlobalID() (int64, error) { } // GetGlobalID gets current global id. -func (m *TMeta) GetGlobalID() (int64, error) { +func (m *Meta) GetGlobalID() (int64, error) { return m.txn.GetInt64(mNextGlobalIDKey) } -func (m *TMeta) dbKey(dbID int64) []byte { +func (m *Meta) dbKey(dbID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID)) } -func (m *TMeta) parseDatabaseID(key string) (int64, error) { +func (m *Meta) parseDatabaseID(key string) (int64, error) { seps := strings.Split(key, ":") if len(seps) != 2 { return 0, errors.Errorf("invalid db key %s", key) @@ -104,15 +104,15 @@ func (m *TMeta) parseDatabaseID(key string) (int64, error) { return n, errors.Trace(err) } -func (m *TMeta) autoTalbeIDKey(tableID int64) []byte { +func (m *Meta) autoTalbeIDKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID)) } -func (m *TMeta) tableKey(tableID int64) []byte { +func (m *Meta) tableKey(tableID int64) []byte { return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID)) } -func (m *TMeta) parseTableID(key string) (int64, error) { +func (m *Meta) parseTableID(key string) (int64, error) { seps := strings.Split(key, ":") if len(seps) != 2 { return 0, errors.Errorf("invalid table meta key %s", key) @@ -123,7 +123,7 @@ func (m *TMeta) parseTableID(key string) (int64, error) { } // GenAutoTableID adds step to the auto id of the table and returns the sum. -func (m *TMeta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) { +func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) { // check db exists dbKey := m.dbKey(dbID) if err := m.checkDBExists(dbKey); err != nil { @@ -140,21 +140,21 @@ func (m *TMeta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, er } // GetAutoTableID gets current auto id with table id. -func (m *TMeta) GetAutoTableID(dbID int64, tableID int64) (int64, error) { +func (m *Meta) GetAutoTableID(dbID int64, tableID int64) (int64, error) { return m.txn.HGetInt64(m.dbKey(dbID), m.autoTalbeIDKey(tableID)) } // GetSchemaVersion gets current global schema version. -func (m *TMeta) GetSchemaVersion() (int64, error) { +func (m *Meta) GetSchemaVersion() (int64, error) { return m.txn.GetInt64(mSchemaVersionKey) } // GenSchemaVersion generates next schema version. -func (m *TMeta) GenSchemaVersion() (int64, error) { +func (m *Meta) GenSchemaVersion() (int64, error) { return m.txn.Inc(mSchemaVersionKey, 1) } -func (m *TMeta) checkDBExists(dbKey []byte) error { +func (m *Meta) checkDBExists(dbKey []byte) error { v, err := m.txn.HGet(mDBs, dbKey) if err != nil { return errors.Trace(err) @@ -165,33 +165,39 @@ func (m *TMeta) checkDBExists(dbKey []byte) error { return nil } -func (m *TMeta) checkDBNotExists(dbKey []byte) error { +func (m *Meta) checkDBNotExists(dbKey []byte) error { v, err := m.txn.HGet(mDBs, dbKey) if err != nil { return errors.Trace(err) - } else if v != nil { + } + + if v != nil { return ErrDBExists } return nil } -func (m *TMeta) checkTableExists(dbKey []byte, tableKey []byte) error { +func (m *Meta) checkTableExists(dbKey []byte, tableKey []byte) error { v, err := m.txn.HGet(dbKey, tableKey) if err != nil { return errors.Trace(err) - } else if v == nil { + } + + if v == nil { return ErrTableNotExists } return nil } -func (m *TMeta) checkTableNotExists(dbKey []byte, tableKey []byte) error { +func (m *Meta) checkTableNotExists(dbKey []byte, tableKey []byte) error { v, err := m.txn.HGet(dbKey, tableKey) if err != nil { return errors.Trace(err) - } else if v != nil { + } + + if v != nil { return ErrTableExists } @@ -199,7 +205,7 @@ func (m *TMeta) checkTableNotExists(dbKey []byte, tableKey []byte) error { } // CreateDatabase creates a database with db info. -func (m *TMeta) CreateDatabase(dbInfo *model.DBInfo) error { +func (m *Meta) CreateDatabase(dbInfo *model.DBInfo) error { dbKey := m.dbKey(dbInfo.ID) if err := m.checkDBNotExists(dbKey); err != nil { @@ -215,7 +221,7 @@ func (m *TMeta) CreateDatabase(dbInfo *model.DBInfo) error { } // UpdateDatabase updates a database with db info. -func (m *TMeta) UpdateDatabase(dbInfo *model.DBInfo) error { +func (m *Meta) UpdateDatabase(dbInfo *model.DBInfo) error { dbKey := m.dbKey(dbInfo.ID) if err := m.checkDBExists(dbKey); err != nil { @@ -231,7 +237,7 @@ func (m *TMeta) UpdateDatabase(dbInfo *model.DBInfo) error { } // CreateTable creates a table with tableInfo in database. -func (m *TMeta) CreateTable(dbID int64, tableInfo *model.TableInfo) error { +func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error { // first check db exists or not. dbKey := m.dbKey(dbID) if err := m.checkDBExists(dbKey); err != nil { @@ -253,7 +259,7 @@ func (m *TMeta) CreateTable(dbID int64, tableInfo *model.TableInfo) error { } // DropDatabase drops whole database. -func (m *TMeta) DropDatabase(dbID int64) error { +func (m *Meta) DropDatabase(dbID int64) error { // first check db exists or not. dbKey := m.dbKey(dbID) @@ -269,7 +275,7 @@ func (m *TMeta) DropDatabase(dbID int64) error { } // DropTable drops table in database. -func (m *TMeta) DropTable(dbID int64, tableID int64) error { +func (m *Meta) DropTable(dbID int64, tableID int64) error { // first check db exists or not. dbKey := m.dbKey(dbID) if err := m.checkDBExists(dbKey); err != nil { @@ -295,7 +301,7 @@ func (m *TMeta) DropTable(dbID int64, tableID int64) error { } // UpdateTable updates the table with table info. -func (m *TMeta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error { +func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error { // first check db exists or not. dbKey := m.dbKey(dbID) if err := m.checkDBExists(dbKey); err != nil { @@ -320,7 +326,7 @@ func (m *TMeta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error { } // ListTables shows all tables in database. -func (m *TMeta) ListTables(dbID int64) ([]*model.TableInfo, error) { +func (m *Meta) ListTables(dbID int64) ([]*model.TableInfo, error) { dbKey := m.dbKey(dbID) if err := m.checkDBExists(dbKey); err != nil { return nil, errors.Trace(err) @@ -352,7 +358,7 @@ func (m *TMeta) ListTables(dbID int64) ([]*model.TableInfo, error) { } // ListDatabases shows all databases. -func (m *TMeta) ListDatabases() ([]*model.DBInfo, error) { +func (m *Meta) ListDatabases() ([]*model.DBInfo, error) { res, err := m.txn.HGetAll(mDBs) if err != nil { return nil, errors.Trace(err) @@ -371,7 +377,7 @@ func (m *TMeta) ListDatabases() ([]*model.DBInfo, error) { } // GetDatabase gets the database value with ID. -func (m *TMeta) GetDatabase(dbID int64) (*model.DBInfo, error) { +func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) { dbKey := m.dbKey(dbID) value, err := m.txn.HGet(mDBs, dbKey) if err != nil || value == nil { @@ -384,7 +390,7 @@ func (m *TMeta) GetDatabase(dbID int64) (*model.DBInfo, error) { } // GetTable gets the table value in database with tableID. -func (m *TMeta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) { +func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) { // first check db exists or not. dbKey := m.dbKey(dbID) if err := m.checkDBExists(dbKey); err != nil { @@ -418,7 +424,7 @@ var ( ) // GetDDLOwner gets the current owner for DDL. -func (m *TMeta) GetDDLOwner() (*model.Owner, error) { +func (m *Meta) GetDDLOwner() (*model.Owner, error) { value, err := m.txn.Get(mDDLOwnerKey) if err != nil || value == nil { return nil, errors.Trace(err) @@ -430,7 +436,7 @@ func (m *TMeta) GetDDLOwner() (*model.Owner, error) { } // SetDDLOwner sets the current owner for DDL. -func (m *TMeta) SetDDLOwner(o *model.Owner) error { +func (m *Meta) SetDDLOwner(o *model.Owner) error { b, err := json.Marshal(o) if err != nil { return errors.Trace(err) @@ -439,7 +445,7 @@ func (m *TMeta) SetDDLOwner(o *model.Owner) error { } // EnQueueDDLJob adds a DDL job to the list. -func (m *TMeta) EnQueueDDLJob(job *model.Job) error { +func (m *Meta) EnQueueDDLJob(job *model.Job) error { b, err := json.Marshal(job) if err != nil { return errors.Trace(err) @@ -448,7 +454,7 @@ func (m *TMeta) EnQueueDDLJob(job *model.Job) error { } // DeQueueDDLJob pops a DDL job from the list. -func (m *TMeta) DeQueueDDLJob() (*model.Job, error) { +func (m *Meta) DeQueueDDLJob() (*model.Job, error) { value, err := m.txn.LPop(mDDLJobListKey) if err != nil || value == nil { return nil, errors.Trace(err) @@ -460,7 +466,7 @@ func (m *TMeta) DeQueueDDLJob() (*model.Job, error) { } // GetDDLJob returns the DDL job with index. -func (m *TMeta) GetDDLJob(index int64) (*model.Job, error) { +func (m *Meta) GetDDLJob(index int64) (*model.Job, error) { value, err := m.txn.LIndex(mDDLJobListKey, index) if err != nil || value == nil { return nil, errors.Trace(err) @@ -472,7 +478,7 @@ func (m *TMeta) GetDDLJob(index int64) (*model.Job, error) { } // UpdateDDLJob updates the DDL job with index. -func (m *TMeta) UpdateDDLJob(index int64, job *model.Job) error { +func (m *Meta) UpdateDDLJob(index int64, job *model.Job) error { b, err := json.Marshal(job) if err != nil { return errors.Trace(err) @@ -481,18 +487,18 @@ func (m *TMeta) UpdateDDLJob(index int64, job *model.Job) error { } // DDLJobLength returns the DDL job length. -func (m *TMeta) DDLJobLength() (int64, error) { +func (m *Meta) DDLJobLength() (int64, error) { return m.txn.LLen(mDDLJobListKey) } -func (m *TMeta) jobIDKey(id int64) []byte { +func (m *Meta) jobIDKey(id int64) []byte { b := make([]byte, 8) binary.BigEndian.PutUint64(b, uint64(id)) return b } // AddHistoryDDLJob adds DDL job to history. -func (m *TMeta) AddHistoryDDLJob(job *model.Job) error { +func (m *Meta) AddHistoryDDLJob(job *model.Job) error { b, err := json.Marshal(job) if err != nil { return errors.Trace(err) @@ -501,7 +507,7 @@ func (m *TMeta) AddHistoryDDLJob(job *model.Job) error { } // GetHistoryDDLJob gets a history DDL job. -func (m *TMeta) GetHistoryDDLJob(id int64) (*model.Job, error) { +func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) { value, err := m.txn.HGet(mDDLJobHistoryKey, m.jobIDKey(id)) if err != nil || value == nil { return nil, errors.Trace(err)