From d398e37832671bc27deaa6566fe042f7e45b92ee Mon Sep 17 00:00:00 2001 From: ngaut Date: Thu, 24 Sep 2015 21:41:22 +0800 Subject: [PATCH] Refactor WIP --- ddl/ddl.go | 145 +++++++++++++++++++++++++-------------- domain/domain.go | 10 ++- infoschema/infoschema.go | 35 ++++------ meta/meta.go | 2 +- 4 files changed, 113 insertions(+), 79 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 99ce798fdb..0dce3b3200 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -94,27 +94,38 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error) if err != nil { return errors.Trace(err) } - err = d.writeSchemaInfo(info) - if err != nil { + + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion()) + if err != nil { + return errors.Trace(err) + } + err = d.writeSchemaInfo(info, txn) return errors.Trace(err) - } + }) newInfo := append(is.Clone(), info) - d.infoHandle.Set(newInfo) + d.infoHandle.Set(newInfo, is.SchemaMetaVersion()+1) return nil } -func (d *ddl) verifySchemaMetaVersion(txn kv.Transaction) error { - curVer, err := txn.GetInt64(meta.SchemaMetaVersion) +func (d *ddl) verifySchemaMetaVersion(txn kv.Transaction, schemaMetaVersion int64) error { + curVer, err := txn.GetInt64(meta.SchemaMetaVersionKey) if err != nil { return errors.Trace(err) } - ourVer := d.infoHandle.SchemaMetaVersion() - if curVer != ourVer { - return errors.Errorf("Schema changed, our version %d, got %d", ourVer, curVer) + if curVer != schemaMetaVersion { + return errors.Errorf("Schema changed, our version %d, got %d", schemaMetaVersion, curVer) } // Increment version - _, err = txn.Inc(meta.SchemaMetaVersion, 1) + _, err = txn.Inc(meta.SchemaMetaVersionKey, 1) + if err != nil { + return errors.Trace(err) + } + + if err := txn.LockKeys(meta.SchemaMetaVersionKey); err != nil { + return errors.Trace(err) + } return errors.Trace(err) } @@ -133,7 +144,7 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) { newInfo = append(newInfo, v) } } - d.infoHandle.Set(newInfo) + d.infoHandle.Set(newInfo, is.SchemaMetaVersion()) // Remove data txn, err := ctx.GetTxn(true) @@ -158,12 +169,12 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) { // Delete meta key err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - err := d.verifySchemaMetaVersion(txn) + err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) } key := []byte(meta.DBMetaKey(old.ID)) - if err := txn.LockKeys(meta.SchemaMetaVersion, key); err != nil { + if err := txn.LockKeys(key); err != nil { return errors.Trace(err) } return txn.Delete(key) @@ -386,7 +397,19 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col return errors.Trace(err) } log.Infof("New table: %+v", tbInfo) - err = d.updateInfoSchema(ctx, ident.Schema, tbInfo) + + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion()) + if err != nil { + return errors.Trace(err) + } + err = d.updateInfoSchema(ident.Schema, tbInfo, txn) + return errors.Trace(err) + }) + + if d.onDDLChange != nil { + err = d.onDDLChange(err) + } return errors.Trace(err) } @@ -403,7 +426,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS for _, spec := range specs { switch spec.Action { case AlterAddColumn: - if err := d.addColumn(ctx, ident.Schema, tbl, spec); err != nil { + if err := d.addColumn(ident.Schema, tbl, spec, is.SchemaMetaVersion()); err != nil { return errors.Trace(err) } default: @@ -415,7 +438,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS } // Add a column into table -func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table, spec *AlterSpecification) error { +func (d *ddl) addColumn(schema model.CIStr, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error { // Find position cols := tbl.Cols() position := len(cols) @@ -468,7 +491,15 @@ func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table // TODO: update index // TODO: update default value // update infomation schema - err = d.updateInfoSchema(ctx, schema, tb.Meta()) + + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := d.verifySchemaMetaVersion(txn, schemaMetaVersion) + if err != nil { + return errors.Trace(err) + } + err = d.updateInfoSchema(schema, tb.Meta(), txn) + return errors.Trace(err) + }) return errors.Trace(err) } @@ -481,23 +512,31 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) { } // update InfoSchema before delete all the table data. clonedInfo := is.Clone() - for _, info := range clonedInfo { - if info.Name == ti.Schema { - var newTableInfos []*model.TableInfo - // append other tables. - for _, tbInfo := range info.Tables { - if tbInfo.Name.L != ti.Name.L { - newTableInfos = append(newTableInfos, tbInfo) + + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion()) + if err != nil { + return errors.Trace(err) + } + for _, info := range clonedInfo { + if info.Name == ti.Schema { + var newTableInfos []*model.TableInfo + // append other tables. + for _, tbInfo := range info.Tables { + if tbInfo.Name.L != ti.Name.L { + newTableInfos = append(newTableInfos, tbInfo) + } + } + info.Tables = newTableInfos + err = d.writeSchemaInfo(info, txn) + if err != nil { + return errors.Trace(err) } } - info.Tables = newTableInfos - err = d.writeSchemaInfo(info) - if err != nil { - return errors.Trace(err) - } } - } - d.infoHandle.Set(clonedInfo) + return nil + }) + d.infoHandle.Set(clonedInfo, is.SchemaMetaVersion()+1) err = d.deleteTableData(ctx, tb) return errors.Trace(err) } @@ -581,7 +620,15 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde } // update InfoSchema - return d.updateInfoSchema(ctx, ti.Schema, tbInfo) + err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion()) + if err != nil { + return errors.Trace(err) + } + err = d.updateInfoSchema(ti.Schema, tbInfo, txn) + return errors.Trace(err) + }) + return errors.Trace(err) } func (d *ddl) buildIndex(ctx context.Context, t table.Table, idxInfo *model.IndexInfo, unique bool) error { @@ -646,32 +693,28 @@ func (d *ddl) DropIndex(ctx context.Context, schema, tableName, indexNmae model. return nil } -func (d *ddl) writeSchemaInfo(info *model.DBInfo) error { +func (d *ddl) writeSchemaInfo(info *model.DBInfo, txn kv.Transaction) error { var b []byte b, err := json.Marshal(info) if err != nil { return errors.Trace(err) } - err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - err := d.verifySchemaMetaVersion(txn) - if err != nil { - return errors.Trace(err) - } - key := []byte(meta.DBMetaKey(info.ID)) - if err := txn.LockKeys(meta.SchemaMetaVersion, key); err != nil { - return errors.Trace(err) - } - return txn.Set(key, b) - }) - log.Warn("save schema", string(b)) - if d.onDDLChange != nil { - err = d.onDDLChange(err) + if err != nil { + return errors.Trace(err) } + key := []byte(meta.DBMetaKey(info.ID)) + if err := txn.LockKeys(key); err != nil { + return errors.Trace(err) + } + txn.Set(key, b) + log.Warn("save schema", string(b)) + return errors.Trace(err) } -func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error { - clonedInfo := d.GetInformationSchema().Clone() +func (d *ddl) updateInfoSchema(schema model.CIStr, tbInfo *model.TableInfo, txn kv.Transaction) error { + is := d.GetInformationSchema() + clonedInfo := is.Clone() for _, info := range clonedInfo { if info.Name == schema { var match bool @@ -684,12 +727,12 @@ func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo * if !match { info.Tables = append(info.Tables, tbInfo) } - err := d.writeSchemaInfo(info) + err := d.writeSchemaInfo(info, txn) if err != nil { return errors.Trace(err) } } } - d.infoHandle.Set(clonedInfo) + d.infoHandle.Set(clonedInfo, is.SchemaMetaVersion()+1) return nil } diff --git a/domain/domain.go b/domain/domain.go index 11a7b7b223..38a3dc7944 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -48,13 +48,12 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) { if err != nil { return errors.Trace(err) } - schemaMetaVersion, err := txn.GetInt64(meta.SchemaMetaVersion) + schemaMetaVersion, err := txn.GetInt64(meta.SchemaMetaVersionKey) if err != nil { return } log.Info("loadInfoSchema %d", schemaMetaVersion) - do.infoHandle.SetSchemaMetaVersion(schemaMetaVersion) - do.infoHandle.Set(schemas) + do.infoHandle.Set(schemas, schemaMetaVersion) return } @@ -83,9 +82,6 @@ func (do *Domain) onDDLChange(err error) error { } func (do *Domain) reload() error { - infoHandle := infoschema.NewHandle(do.store) - do.infoHandle = infoHandle - do.ddl = ddl.NewDDL(do.store, infoHandle, do.onDDLChange) err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema) return errors.Trace(err) } @@ -96,6 +92,8 @@ func NewDomain(store kv.Storage) (d *Domain, err error) { store: store, } + d.infoHandle = infoschema.NewHandle(d.store) + d.ddl = ddl.NewDDL(d.store, d.infoHandle, d.onDDLChange) err = d.reload() return d, errors.Trace(err) } diff --git a/infoschema/infoschema.go b/infoschema/infoschema.go index de516b5439..4ba0566d8f 100644 --- a/infoschema/infoschema.go +++ b/infoschema/infoschema.go @@ -45,6 +45,7 @@ type InfoSchema interface { AllSchemas() []*model.DBInfo Clone() (result []*model.DBInfo) SchemaTables(schema model.CIStr) []table.Table + SchemaMetaVersion() int64 // TODO: add more methods to retrieve tables and columns. } @@ -62,6 +63,9 @@ type infoSchema struct { columns map[int64]*model.ColumnInfo indices map[indexName]*model.IndexInfo columnIndices map[int64][]*model.IndexInfo + + // We should to check version when change schema + schemaMetaVersion int64 } type tableName struct { @@ -185,8 +189,6 @@ func (is *infoSchema) Clone() (result []*model.DBInfo) { type Handle struct { value atomic.Value store kv.Storage - // We should to check version when change schema - schemaMetaVersion int64 } // NewHandle creates a new Handle. @@ -196,27 +198,18 @@ func NewHandle(store kv.Storage) *Handle { } } -// SetSchemaMetaVersion set schema meta version -func (h *Handle) SetSchemaMetaVersion(ver int64) { - h.schemaMetaVersion = ver -} - -// SchemaMetaVersion return version of schema meta -func (h *Handle) SchemaMetaVersion() int64 { - return h.schemaMetaVersion -} - // Set sets DBInfo to information schema. -func (h *Handle) Set(newInfo []*model.DBInfo) { +func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) { info := &infoSchema{ - schemaNameToID: map[string]int64{}, - tableNameToID: map[tableName]int64{}, - columnNameToID: map[columnName]int64{}, - schemas: map[int64]*model.DBInfo{}, - tables: map[int64]table.Table{}, - columns: map[int64]*model.ColumnInfo{}, - indices: map[indexName]*model.IndexInfo{}, - columnIndices: map[int64][]*model.IndexInfo{}, + schemaNameToID: map[string]int64{}, + tableNameToID: map[tableName]int64{}, + columnNameToID: map[columnName]int64{}, + schemas: map[int64]*model.DBInfo{}, + tables: map[int64]table.Table{}, + columns: map[int64]*model.ColumnInfo{}, + indices: map[indexName]*model.IndexInfo{}, + columnIndices: map[int64][]*model.IndexInfo{}, + schemaMetaVersion: schemaMetaVersion, } for _, di := range newInfo { info.schemas[di.ID] = di diff --git a/meta/meta.go b/meta/meta.go index 77fff0850d..eb710b9528 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -31,7 +31,7 @@ const ( var ( nextGlobalIDPrefix = []byte("mNextGlobalID") // SchemaMetaVersion is used as lock for changing schema - SchemaMetaVersion = []byte("mSchemaVersion") + SchemaMetaVersionKey = []byte("mSchemaVersion") ) // GenID adds step to the value for key and returns the sum.