Merge pull request #480 from pingcap/siddontang/cleanup-meta

cleanup meta
This commit is contained in:
goroutine
2015-10-29 18:53:42 +08:00
11 changed files with 148 additions and 250 deletions

View File

@ -62,7 +62,7 @@ type DDL interface {
type ddl struct {
infoHandle *infoschema.Handle
onDDLChange OnDDLChange
meta *meta.Meta
store kv.Storage
// schema lease seconds.
lease int
uuid string
@ -78,7 +78,7 @@ func NewDDL(store kv.Storage, infoHandle *infoschema.Handle, hook OnDDLChange, l
d := &ddl{
infoHandle: infoHandle,
onDDLChange: hook,
meta: meta.NewMeta(store),
store: store,
lease: lease,
uuid: uuid.NewV4().String(),
jobCh: make(chan struct{}, 1),
@ -92,6 +92,17 @@ func (d *ddl) GetInformationSchema() infoschema.InfoSchema {
return d.infoHandle.Get()
}
func (d *ddl) genGlobalID() (int64, error) {
var globalID int64
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
var err error
globalID, err = meta.NewMeta(txn).GenGlobalID()
return errors.Trace(err)
})
return globalID, errors.Trace(err)
}
func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) {
is := d.GetInformationSchema()
_, ok := is.SchemaByName(schema)
@ -99,12 +110,13 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error)
return errors.Trace(ErrExists)
}
info := &model.DBInfo{Name: schema}
info.ID, err = d.meta.GenGlobalID()
info.ID, err = d.genGlobalID()
if err != nil {
return errors.Trace(err)
}
err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
@ -121,7 +133,7 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error)
return errors.Trace(err)
}
func (d *ddl) verifySchemaMetaVersion(t *meta.TMeta, schemaMetaVersion int64) error {
func (d *ddl) verifySchemaMetaVersion(t *meta.Meta, schemaMetaVersion int64) error {
curVer, err := t.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
@ -172,7 +184,8 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
}
}
err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
@ -276,7 +289,7 @@ func (d *ddl) buildColumnAndConstraint(offset int, colDef *coldef.ColumnDef) (*c
if err != nil {
return nil, nil, errors.Trace(err)
}
col.ID, err = d.meta.GenGlobalID()
col.ID, err = d.genGlobalID()
if err != nil {
return nil, nil, errors.Trace(err)
}
@ -331,7 +344,7 @@ func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constrai
tbInfo = &model.TableInfo{
Name: tableName,
}
tbInfo.ID, err = d.meta.GenGlobalID()
tbInfo.ID, err = d.genGlobalID()
if err != nil {
return nil, errors.Trace(err)
}
@ -399,7 +412,8 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
}
log.Infof("New table: %+v", tbInfo)
err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
@ -500,7 +514,8 @@ func (d *ddl) addColumn(ctx context.Context, schema *model.DBInfo, tbl table.Tab
}
// update infomation schema
err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifySchemaMetaVersion(t, schemaMetaVersion)
if err != nil {
return errors.Trace(err)
@ -562,7 +577,8 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) {
return errors.Trace(err)
}
err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
@ -659,7 +675,8 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde
}
// update InfoSchema
err = d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifySchemaMetaVersion(t, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)

View File

@ -19,6 +19,7 @@ import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/model"
)
@ -30,7 +31,8 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
}
// Create a new job and queue it.
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
job.ID, err = t.GenGlobalID()
if err != nil {
@ -78,7 +80,8 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
func (d *ddl) getHistoryJob(id int64) (*model.Job, error) {
var job *model.Job
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
@ -94,7 +97,7 @@ func asyncNotify(ch chan struct{}) {
}
}
func (d *ddl) verifyOwner(t *meta.TMeta) error {
func (d *ddl) verifyOwner(t *meta.Meta) error {
owner, err := t.GetDDLOwner()
if err != nil {
return errors.Trace(err)
@ -126,7 +129,7 @@ func (d *ddl) verifyOwner(t *meta.TMeta) error {
}
// every time we enter another state, we must call this function.
func (d *ddl) updateJob(t *meta.TMeta, job *model.Job) error {
func (d *ddl) updateJob(t *meta.Meta, job *model.Job) error {
err := d.verifyOwner(t)
if err != nil {
return errors.Trace(err)
@ -139,7 +142,9 @@ func (d *ddl) updateJob(t *meta.TMeta, job *model.Job) error {
func (d *ddl) getFirstJob() (*model.Job, error) {
var job *model.Job
err := d.meta.RunInNewTxn(true, func(t *meta.TMeta) error {
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetDDLJob(0)
return errors.Trace(err1)
@ -150,7 +155,8 @@ func (d *ddl) getFirstJob() (*model.Job, error) {
func (d *ddl) finishJob(job *model.Job) error {
// done, notice and run next job.
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err := d.verifyOwner(t)
if err != nil {
return errors.Trace(err)
@ -169,7 +175,8 @@ func (d *ddl) finishJob(job *model.Job) error {
}
func (d *ddl) checkOwner() error {
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
return errors.Trace(d.verifyOwner(t))
})

View File

@ -30,11 +30,11 @@ type Domain struct {
store kv.Storage
infoHandle *infoschema.Handle
ddl ddl.DDL
meta *meta.Meta
lease int
}
func (do *Domain) loadInfoSchema(m *meta.TMeta) (err error) {
func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
m := meta.NewMeta(txn)
schemaMetaVersion, err := m.GetSchemaVersion()
if err != nil {
return errors.Trace(err)
@ -90,7 +90,7 @@ func (do *Domain) onDDLChange(err error) error {
}
func (do *Domain) reload() error {
err := do.meta.RunInNewTxn(false, do.loadInfoSchema)
err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
return errors.Trace(err)
}
@ -116,7 +116,6 @@ func (do *Domain) loadSchemaInLoop() {
func NewDomain(store kv.Storage, lease int) (d *Domain, err error) {
d = &Domain{
store: store,
meta: meta.NewMeta(store),
lease: lease,
}

View File

@ -18,7 +18,6 @@ import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
@ -191,13 +190,13 @@ func (is *infoSchema) Clone() (result []*model.DBInfo) {
// Handle handles information schema, including getting and setting.
type Handle struct {
value atomic.Value
meta *meta.Meta
store kv.Storage
}
// NewHandle creates a new Handle.
func NewHandle(store kv.Storage) *Handle {
return &Handle{
meta: meta.NewMeta(store),
store: store,
}
}
@ -218,7 +217,7 @@ func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) {
info.schemas[di.ID] = di
info.schemaNameToID[di.Name.L] = di.ID
for _, t := range di.Tables {
alloc := autoid.NewAllocator(h.meta, di.ID)
alloc := autoid.NewAllocator(h.store, di.ID)
info.tables[t.ID] = table.TableFromMeta(di.Name.L, alloc, t)
tname := tableName{di.Name.L, t.Name.L}
info.tableNameToID[tname] = t.ID

View File

@ -18,6 +18,7 @@ import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
)
@ -32,11 +33,11 @@ type Allocator interface {
}
type allocator struct {
mu sync.Mutex
base int64
end int64
m *meta.Meta
dbID int64
mu sync.Mutex
base int64
end int64
store kv.Storage
dbID int64
}
// Alloc allocs the next autoID for table with tableID.
@ -48,7 +49,8 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) {
alloc.mu.Lock()
defer alloc.mu.Unlock()
if alloc.base == alloc.end { // step
err := alloc.m.RunInNewTxn(true, func(m *meta.TMeta) error {
err := kv.RunInNewTxn(alloc.store, true, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
// err1 is used for passing `go tool vet --shadow` check.
end, err1 := m.GenAutoTableID(alloc.dbID, tableID, step)
if err1 != nil {
@ -71,9 +73,9 @@ func (alloc *allocator) Alloc(tableID int64) (int64, error) {
}
// NewAllocator returns a new auto increment id generator on the store.
func NewAllocator(m *meta.Meta, dbID int64) Allocator {
func NewAllocator(store kv.Storage, dbID int64) Allocator {
return &allocator{
m: m,
dbID: dbID,
store: store,
dbID: dbID,
}
}

View File

@ -17,6 +17,7 @@ import (
"testing"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/model"
@ -39,8 +40,8 @@ func (*testSuite) TestT(c *C) {
c.Assert(err, IsNil)
defer store.Close()
m := meta.NewMeta(store)
err = m.RunInNewTxn(false, func(m *meta.TMeta) error {
err = kv.RunInNewTxn(store, false, func(txn kv.Transaction) error {
m := meta.NewMeta(txn)
err = m.CreateDatabase(&model.DBInfo{ID: 1, Name: model.NewCIStr("a")})
c.Assert(err, IsNil)
err = m.CreateTable(1, &model.TableInfo{ID: 1, Name: model.NewCIStr("t")})
@ -49,7 +50,7 @@ func (*testSuite) TestT(c *C) {
})
c.Assert(err, IsNil)
alloc := autoid.NewAllocator(m, 1)
alloc := autoid.NewAllocator(store, 1)
c.Assert(alloc, NotNil)
id, err := alloc.Alloc(1)

View File

@ -66,64 +66,19 @@ var (
ErrTableNotExists = errors.New("table doesn't exist")
)
// Meta is the structure saving meta information.
// Meta is for handling meta information in a transaction.
type Meta struct {
store *structure.TStore
}
// TMeta is for handling meta information in a transaction.
type TMeta struct {
txn *structure.TxStructure
}
// NewMeta creates a Meta with kv storage.
func NewMeta(store kv.Storage) *Meta {
m := &Meta{
store: structure.NewStore(store, []byte{0x00}),
}
return m
}
// Begin creates a TMeta object and you can handle meta information in a transaction.
func (m *Meta) Begin() (*TMeta, error) {
t := &TMeta{}
var err error
t.txn, err = m.store.Begin()
if err != nil {
return nil, errors.Trace(err)
}
return t, nil
}
// RunInNewTxn runs fn in a new transaction.
func (m *Meta) RunInNewTxn(retryable bool, f func(t *TMeta) error) error {
fn := func(txn *structure.TxStructure) error {
t := &TMeta{txn: txn}
return errors.Trace(f(t))
}
err := m.store.RunInNewTxn(retryable, fn)
return errors.Trace(err)
// NewMeta creates a Meta in transaction txn.
func NewMeta(txn kv.Transaction) *Meta {
t := structure.NewStructure(txn, []byte{0x00})
return &Meta{txn: t}
}
// GenGlobalID generates next id globally.
func (m *Meta) GenGlobalID() (int64, error) {
var (
id int64
err error
)
err1 := m.RunInNewTxn(true, func(t *TMeta) error {
id, err = t.GenGlobalID()
return errors.Trace(err)
})
return id, errors.Trace(err1)
}
// GenGlobalID generates next id globally.
func (m *TMeta) GenGlobalID() (int64, error) {
globalIDMutex.Lock()
defer globalIDMutex.Unlock()
@ -131,15 +86,15 @@ func (m *TMeta) GenGlobalID() (int64, error) {
}
// GetGlobalID gets current global id.
func (m *TMeta) GetGlobalID() (int64, error) {
func (m *Meta) GetGlobalID() (int64, error) {
return m.txn.GetInt64(mNextGlobalIDKey)
}
func (m *TMeta) dbKey(dbID int64) []byte {
func (m *Meta) dbKey(dbID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mDBPrefix, dbID))
}
func (m *TMeta) parseDatabaseID(key string) (int64, error) {
func (m *Meta) parseDatabaseID(key string) (int64, error) {
seps := strings.Split(key, ":")
if len(seps) != 2 {
return 0, errors.Errorf("invalid db key %s", key)
@ -149,15 +104,15 @@ func (m *TMeta) parseDatabaseID(key string) (int64, error) {
return n, errors.Trace(err)
}
func (m *TMeta) autoTalbeIDKey(tableID int64) []byte {
func (m *Meta) autoTalbeIDKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTableIDPrefix, tableID))
}
func (m *TMeta) tableKey(tableID int64) []byte {
func (m *Meta) tableKey(tableID int64) []byte {
return []byte(fmt.Sprintf("%s:%d", mTablePrefix, tableID))
}
func (m *TMeta) parseTableID(key string) (int64, error) {
func (m *Meta) parseTableID(key string) (int64, error) {
seps := strings.Split(key, ":")
if len(seps) != 2 {
return 0, errors.Errorf("invalid table meta key %s", key)
@ -168,7 +123,7 @@ func (m *TMeta) parseTableID(key string) (int64, error) {
}
// GenAutoTableID adds step to the auto id of the table and returns the sum.
func (m *TMeta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
func (m *Meta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, error) {
// check db exists
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
@ -185,21 +140,21 @@ func (m *TMeta) GenAutoTableID(dbID int64, tableID int64, step int64) (int64, er
}
// GetAutoTableID gets current auto id with table id.
func (m *TMeta) GetAutoTableID(dbID int64, tableID int64) (int64, error) {
func (m *Meta) GetAutoTableID(dbID int64, tableID int64) (int64, error) {
return m.txn.HGetInt64(m.dbKey(dbID), m.autoTalbeIDKey(tableID))
}
// GetSchemaVersion gets current global schema version.
func (m *TMeta) GetSchemaVersion() (int64, error) {
func (m *Meta) GetSchemaVersion() (int64, error) {
return m.txn.GetInt64(mSchemaVersionKey)
}
// GenSchemaVersion generates next schema version.
func (m *TMeta) GenSchemaVersion() (int64, error) {
func (m *Meta) GenSchemaVersion() (int64, error) {
return m.txn.Inc(mSchemaVersionKey, 1)
}
func (m *TMeta) checkDBExists(dbKey []byte) error {
func (m *Meta) checkDBExists(dbKey []byte) error {
v, err := m.txn.HGet(mDBs, dbKey)
if err != nil {
return errors.Trace(err)
@ -210,33 +165,39 @@ func (m *TMeta) checkDBExists(dbKey []byte) error {
return nil
}
func (m *TMeta) checkDBNotExists(dbKey []byte) error {
func (m *Meta) checkDBNotExists(dbKey []byte) error {
v, err := m.txn.HGet(mDBs, dbKey)
if err != nil {
return errors.Trace(err)
} else if v != nil {
}
if v != nil {
return ErrDBExists
}
return nil
}
func (m *TMeta) checkTableExists(dbKey []byte, tableKey []byte) error {
func (m *Meta) checkTableExists(dbKey []byte, tableKey []byte) error {
v, err := m.txn.HGet(dbKey, tableKey)
if err != nil {
return errors.Trace(err)
} else if v == nil {
}
if v == nil {
return ErrTableNotExists
}
return nil
}
func (m *TMeta) checkTableNotExists(dbKey []byte, tableKey []byte) error {
func (m *Meta) checkTableNotExists(dbKey []byte, tableKey []byte) error {
v, err := m.txn.HGet(dbKey, tableKey)
if err != nil {
return errors.Trace(err)
} else if v != nil {
}
if v != nil {
return ErrTableExists
}
@ -244,7 +205,7 @@ func (m *TMeta) checkTableNotExists(dbKey []byte, tableKey []byte) error {
}
// CreateDatabase creates a database with db info.
func (m *TMeta) CreateDatabase(dbInfo *model.DBInfo) error {
func (m *Meta) CreateDatabase(dbInfo *model.DBInfo) error {
dbKey := m.dbKey(dbInfo.ID)
if err := m.checkDBNotExists(dbKey); err != nil {
@ -260,7 +221,7 @@ func (m *TMeta) CreateDatabase(dbInfo *model.DBInfo) error {
}
// UpdateDatabase updates a database with db info.
func (m *TMeta) UpdateDatabase(dbInfo *model.DBInfo) error {
func (m *Meta) UpdateDatabase(dbInfo *model.DBInfo) error {
dbKey := m.dbKey(dbInfo.ID)
if err := m.checkDBExists(dbKey); err != nil {
@ -276,7 +237,7 @@ func (m *TMeta) UpdateDatabase(dbInfo *model.DBInfo) error {
}
// CreateTable creates a table with tableInfo in database.
func (m *TMeta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
// first check db exists or not.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
@ -298,7 +259,7 @@ func (m *TMeta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
}
// DropDatabase drops whole database.
func (m *TMeta) DropDatabase(dbID int64) error {
func (m *Meta) DropDatabase(dbID int64) error {
// first check db exists or not.
dbKey := m.dbKey(dbID)
@ -314,7 +275,7 @@ func (m *TMeta) DropDatabase(dbID int64) error {
}
// DropTable drops table in database.
func (m *TMeta) DropTable(dbID int64, tableID int64) error {
func (m *Meta) DropTable(dbID int64, tableID int64) error {
// first check db exists or not.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
@ -340,7 +301,7 @@ func (m *TMeta) DropTable(dbID int64, tableID int64) error {
}
// UpdateTable updates the table with table info.
func (m *TMeta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
func (m *Meta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
// first check db exists or not.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
@ -365,7 +326,7 @@ func (m *TMeta) UpdateTable(dbID int64, tableInfo *model.TableInfo) error {
}
// ListTables shows all tables in database.
func (m *TMeta) ListTables(dbID int64) ([]*model.TableInfo, error) {
func (m *Meta) ListTables(dbID int64) ([]*model.TableInfo, error) {
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
return nil, errors.Trace(err)
@ -397,7 +358,7 @@ func (m *TMeta) ListTables(dbID int64) ([]*model.TableInfo, error) {
}
// ListDatabases shows all databases.
func (m *TMeta) ListDatabases() ([]*model.DBInfo, error) {
func (m *Meta) ListDatabases() ([]*model.DBInfo, error) {
res, err := m.txn.HGetAll(mDBs)
if err != nil {
return nil, errors.Trace(err)
@ -416,7 +377,7 @@ func (m *TMeta) ListDatabases() ([]*model.DBInfo, error) {
}
// GetDatabase gets the database value with ID.
func (m *TMeta) GetDatabase(dbID int64) (*model.DBInfo, error) {
func (m *Meta) GetDatabase(dbID int64) (*model.DBInfo, error) {
dbKey := m.dbKey(dbID)
value, err := m.txn.HGet(mDBs, dbKey)
if err != nil || value == nil {
@ -429,7 +390,7 @@ func (m *TMeta) GetDatabase(dbID int64) (*model.DBInfo, error) {
}
// GetTable gets the table value in database with tableID.
func (m *TMeta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
// first check db exists or not.
dbKey := m.dbKey(dbID)
if err := m.checkDBExists(dbKey); err != nil {
@ -463,7 +424,7 @@ var (
)
// GetDDLOwner gets the current owner for DDL.
func (m *TMeta) GetDDLOwner() (*model.Owner, error) {
func (m *Meta) GetDDLOwner() (*model.Owner, error) {
value, err := m.txn.Get(mDDLOwnerKey)
if err != nil || value == nil {
return nil, errors.Trace(err)
@ -475,7 +436,7 @@ func (m *TMeta) GetDDLOwner() (*model.Owner, error) {
}
// SetDDLOwner sets the current owner for DDL.
func (m *TMeta) SetDDLOwner(o *model.Owner) error {
func (m *Meta) SetDDLOwner(o *model.Owner) error {
b, err := json.Marshal(o)
if err != nil {
return errors.Trace(err)
@ -484,7 +445,7 @@ func (m *TMeta) SetDDLOwner(o *model.Owner) error {
}
// EnQueueDDLJob adds a DDL job to the list.
func (m *TMeta) EnQueueDDLJob(job *model.Job) error {
func (m *Meta) EnQueueDDLJob(job *model.Job) error {
b, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
@ -493,7 +454,7 @@ func (m *TMeta) EnQueueDDLJob(job *model.Job) error {
}
// DeQueueDDLJob pops a DDL job from the list.
func (m *TMeta) DeQueueDDLJob() (*model.Job, error) {
func (m *Meta) DeQueueDDLJob() (*model.Job, error) {
value, err := m.txn.LPop(mDDLJobListKey)
if err != nil || value == nil {
return nil, errors.Trace(err)
@ -505,7 +466,7 @@ func (m *TMeta) DeQueueDDLJob() (*model.Job, error) {
}
// GetDDLJob returns the DDL job with index.
func (m *TMeta) GetDDLJob(index int64) (*model.Job, error) {
func (m *Meta) GetDDLJob(index int64) (*model.Job, error) {
value, err := m.txn.LIndex(mDDLJobListKey, index)
if err != nil || value == nil {
return nil, errors.Trace(err)
@ -517,7 +478,7 @@ func (m *TMeta) GetDDLJob(index int64) (*model.Job, error) {
}
// UpdateDDLJob updates the DDL job with index.
func (m *TMeta) UpdateDDLJob(index int64, job *model.Job) error {
func (m *Meta) UpdateDDLJob(index int64, job *model.Job) error {
b, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
@ -526,18 +487,18 @@ func (m *TMeta) UpdateDDLJob(index int64, job *model.Job) error {
}
// DDLJobLength returns the DDL job length.
func (m *TMeta) DDLJobLength() (int64, error) {
func (m *Meta) DDLJobLength() (int64, error) {
return m.txn.LLen(mDDLJobListKey)
}
func (m *TMeta) jobIDKey(id int64) []byte {
func (m *Meta) jobIDKey(id int64) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(id))
return b
}
// AddHistoryDDLJob adds DDL job to history.
func (m *TMeta) AddHistoryDDLJob(job *model.Job) error {
func (m *Meta) AddHistoryDDLJob(job *model.Job) error {
b, err := json.Marshal(job)
if err != nil {
return errors.Trace(err)
@ -546,7 +507,7 @@ func (m *TMeta) AddHistoryDDLJob(job *model.Job) error {
}
// GetHistoryDDLJob gets a history DDL job.
func (m *TMeta) GetHistoryDDLJob(id int64) (*model.Job, error) {
func (m *Meta) GetHistoryDDLJob(id int64) (*model.Job, error) {
value, err := m.txn.HGet(mDDLJobHistoryKey, m.jobIDKey(id))
if err != nil || value == nil {
return nil, errors.Trace(err)
@ -556,13 +517,3 @@ func (m *TMeta) GetHistoryDDLJob(id int64) (*model.Job, error) {
err = json.Unmarshal(value, job)
return job, errors.Trace(err)
}
// Commit commits the transaction.
func (m *TMeta) Commit() error {
return m.txn.Commit()
}
// Rollback rolls back the transaction.
func (m *TMeta) Rollback() error {
return m.txn.Rollback()
}

View File

@ -38,12 +38,12 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
defer store.Close()
m := meta.NewMeta(store)
t, err := m.Begin()
txn, err := store.Begin()
c.Assert(err, IsNil)
defer t.Rollback()
defer txn.Rollback()
t := meta.NewMeta(txn)
n, err := t.GenGlobalID()
c.Assert(err, IsNil)
@ -146,28 +146,8 @@ func (s *testSuite) TestMeta(c *C) {
c.Assert(err, IsNil)
c.Assert(dbs, HasLen, 0)
err = t.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
fn := func(m *meta.TMeta) error {
n, err = m.GenSchemaVersion()
c.Assert(err, IsNil)
var n1 int64
n1, err = m.GetSchemaVersion()
c.Assert(err, IsNil)
c.Assert(n, Equals, n1)
return nil
}
err = m.RunInNewTxn(false, fn)
c.Assert(err, IsNil)
n, err = m.GenGlobalID()
c.Assert(err, IsNil)
n1, err := m.GenGlobalID()
c.Assert(err, IsNil)
c.Assert(n1, Equals, n+1)
}
func (s *testSuite) TestDDL(c *C) {
@ -176,12 +156,12 @@ func (s *testSuite) TestDDL(c *C) {
c.Assert(err, IsNil)
defer store.Close()
m := meta.NewMeta(store)
t, err := m.Begin()
txn, err := store.Begin()
c.Assert(err, IsNil)
defer t.Rollback()
defer txn.Rollback()
t := meta.NewMeta(txn)
owner := &model.Owner{OwnerID: "1"}
err = t.SetDDLOwner(owner)
@ -220,6 +200,6 @@ func (s *testSuite) TestDDL(c *C) {
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, job)
err = t.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}

View File

@ -56,10 +56,7 @@ func (t *TxStructure) GetInt64(key []byte) (int64, error) {
// the value after the increment.
func (t *TxStructure) Inc(key []byte, step int64) (int64, error) {
ek := t.encodeStringDataKey(key)
if err := t.txn.LockKeys(ek); err != nil {
return 0, errors.Trace(err)
}
// txn Inc will lock this key, so we don't lock it here.
n, err := t.txn.Inc(ek, step)
if errors2.ErrorEqual(err, kv.ErrNotExist) {
err = nil

View File

@ -13,78 +13,19 @@
package structure
import (
"github.com/juju/errors"
"github.com/pingcap/tidb/kv"
)
import "github.com/pingcap/tidb/kv"
// ErrTxDone is the error returned after transaction has already been committed or rolled back.
var ErrTxDone = errors.New("Transaction has already been committed or rolled back")
// TStore is the storage for data structure.
type TStore struct {
store kv.Storage
prefix []byte
}
// NewStore creates a TStore with kv storage and special key prefix.
func NewStore(store kv.Storage, prefix []byte) *TStore {
s := &TStore{
store: store,
// NewStructure creates a TxStructure in transaction txn and with key prefix.
func NewStructure(txn kv.Transaction, prefix []byte) *TxStructure {
return &TxStructure{
txn: txn,
prefix: prefix,
}
return s
}
// Begin creates a TxStructure for calling structure APIs in a transaction later.
func (s *TStore) Begin() (*TxStructure, error) {
t := &TxStructure{done: false, prefix: s.prefix}
var err error
t.txn, err = s.store.Begin()
if err != nil {
return nil, errors.Trace(err)
}
return t, nil
}
// RunInNewTxn runs f in a new transaction
func (s *TStore) RunInNewTxn(retryable bool, f func(t *TxStructure) error) error {
fn := func(txn kv.Transaction) error {
t := &TxStructure{done: false, prefix: s.prefix, txn: txn}
return errors.Trace(f(t))
}
err := kv.RunInNewTxn(s.store, retryable, fn)
return errors.Trace(err)
}
// TxStructure supports some simple data structures like string, hash, list, etc... and
// you can use these in a transaction.
type TxStructure struct {
txn kv.Transaction
done bool
prefix []byte
}
// Commit commits the transaction.
func (t *TxStructure) Commit() error {
if t.done {
return ErrTxDone
}
t.done = true
return errors.Trace(t.txn.Commit())
}
// Rollback rolls back the transaction.
func (t *TxStructure) Rollback() error {
if t.done {
return ErrTxDone
}
t.done = true
return errors.Trace(t.txn.Rollback())
}

View File

@ -17,18 +17,19 @@ import (
"testing"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/localstore"
"github.com/pingcap/tidb/store/localstore/goleveldb"
)
func TesTxStructure(t *testing.T) {
func TestTxStructure(t *testing.T) {
TestingT(t)
}
var _ = Suite(&tesTxStructureSuite{})
type tesTxStructureSuite struct {
s *TStore
store kv.Storage
}
func (s *tesTxStructureSuite) SetUpSuite(c *C) {
@ -38,19 +39,20 @@ func (s *tesTxStructureSuite) SetUpSuite(c *C) {
}
store, err := d.Open(path)
c.Assert(err, IsNil)
s.s = NewStore(store, []byte{0x00})
s.store = store
}
func (s *tesTxStructureSuite) TearDownSuite(c *C) {
err := s.s.store.Close()
err := s.store.Close()
c.Assert(err, IsNil)
}
func (s *tesTxStructureSuite) TestString(c *C) {
tx, err := s.s.Begin()
txn, err := s.store.Begin()
c.Assert(err, IsNil)
defer txn.Rollback()
defer tx.Rollback()
tx := NewStructure(txn, []byte{0x00})
key := []byte("a")
value := []byte("1")
@ -80,15 +82,16 @@ func (s *tesTxStructureSuite) TestString(c *C) {
c.Assert(err, IsNil)
c.Assert(v, IsNil)
err = tx.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}
func (s *tesTxStructureSuite) TestList(c *C) {
tx, err := s.s.Begin()
txn, err := s.store.Begin()
c.Assert(err, IsNil)
defer txn.Rollback()
defer tx.Rollback()
tx := NewStructure(txn, []byte{0x00})
key := []byte("a")
err = tx.LPush(key, []byte("3"), []byte("2"), []byte("1"))
@ -161,15 +164,16 @@ func (s *tesTxStructureSuite) TestList(c *C) {
c.Assert(err, IsNil)
c.Assert(l, Equals, int64(0))
err = tx.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
}
func (s *tesTxStructureSuite) TestHash(c *C) {
tx, err := s.s.Begin()
txn, err := s.store.Begin()
c.Assert(err, IsNil)
defer txn.Rollback()
defer tx.Rollback()
tx := NewStructure(txn, []byte{0x00})
key := []byte("a")
@ -242,10 +246,11 @@ func (s *tesTxStructureSuite) TestHash(c *C) {
err = tx.HDel(key, []byte("fake_key"))
c.Assert(err, IsNil)
err = tx.Commit()
err = txn.Commit()
c.Assert(err, IsNil)
fn := func(t *TxStructure) error {
err = kv.RunInNewTxn(s.store, false, func(txn kv.Transaction) error {
t := NewStructure(txn, []byte{0x00})
err = t.Set(key, []byte("abc"))
c.Assert(err, IsNil)
@ -253,7 +258,6 @@ func (s *tesTxStructureSuite) TestHash(c *C) {
c.Assert(err, IsNil)
c.Assert(value, DeepEquals, []byte("abc"))
return nil
}
err = s.s.RunInNewTxn(false, fn)
})
c.Assert(err, IsNil)
}