Refactor WIP
This commit is contained in:
145
ddl/ddl.go
145
ddl/ddl.go
@ -94,27 +94,38 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr) (err error)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = d.writeSchemaInfo(info)
|
||||
if err != nil {
|
||||
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = d.writeSchemaInfo(info, txn)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
})
|
||||
newInfo := append(is.Clone(), info)
|
||||
d.infoHandle.Set(newInfo)
|
||||
d.infoHandle.Set(newInfo, is.SchemaMetaVersion()+1)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ddl) verifySchemaMetaVersion(txn kv.Transaction) error {
|
||||
curVer, err := txn.GetInt64(meta.SchemaMetaVersion)
|
||||
func (d *ddl) verifySchemaMetaVersion(txn kv.Transaction, schemaMetaVersion int64) error {
|
||||
curVer, err := txn.GetInt64(meta.SchemaMetaVersionKey)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
ourVer := d.infoHandle.SchemaMetaVersion()
|
||||
if curVer != ourVer {
|
||||
return errors.Errorf("Schema changed, our version %d, got %d", ourVer, curVer)
|
||||
if curVer != schemaMetaVersion {
|
||||
return errors.Errorf("Schema changed, our version %d, got %d", schemaMetaVersion, curVer)
|
||||
}
|
||||
|
||||
// Increment version
|
||||
_, err = txn.Inc(meta.SchemaMetaVersion, 1)
|
||||
_, err = txn.Inc(meta.SchemaMetaVersionKey, 1)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if err := txn.LockKeys(meta.SchemaMetaVersionKey); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -133,7 +144,7 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
|
||||
newInfo = append(newInfo, v)
|
||||
}
|
||||
}
|
||||
d.infoHandle.Set(newInfo)
|
||||
d.infoHandle.Set(newInfo, is.SchemaMetaVersion())
|
||||
|
||||
// Remove data
|
||||
txn, err := ctx.GetTxn(true)
|
||||
@ -158,12 +169,12 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
|
||||
|
||||
// Delete meta key
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn)
|
||||
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
key := []byte(meta.DBMetaKey(old.ID))
|
||||
if err := txn.LockKeys(meta.SchemaMetaVersion, key); err != nil {
|
||||
if err := txn.LockKeys(key); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return txn.Delete(key)
|
||||
@ -386,7 +397,19 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
|
||||
return errors.Trace(err)
|
||||
}
|
||||
log.Infof("New table: %+v", tbInfo)
|
||||
err = d.updateInfoSchema(ctx, ident.Schema, tbInfo)
|
||||
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = d.updateInfoSchema(ident.Schema, tbInfo, txn)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
if d.onDDLChange != nil {
|
||||
err = d.onDDLChange(err)
|
||||
}
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -403,7 +426,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
|
||||
for _, spec := range specs {
|
||||
switch spec.Action {
|
||||
case AlterAddColumn:
|
||||
if err := d.addColumn(ctx, ident.Schema, tbl, spec); err != nil {
|
||||
if err := d.addColumn(ident.Schema, tbl, spec, is.SchemaMetaVersion()); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
default:
|
||||
@ -415,7 +438,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
|
||||
}
|
||||
|
||||
// Add a column into table
|
||||
func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table, spec *AlterSpecification) error {
|
||||
func (d *ddl) addColumn(schema model.CIStr, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error {
|
||||
// Find position
|
||||
cols := tbl.Cols()
|
||||
position := len(cols)
|
||||
@ -468,7 +491,15 @@ func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table
|
||||
// TODO: update index
|
||||
// TODO: update default value
|
||||
// update infomation schema
|
||||
err = d.updateInfoSchema(ctx, schema, tb.Meta())
|
||||
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn, schemaMetaVersion)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = d.updateInfoSchema(schema, tb.Meta(), txn)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -481,23 +512,31 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) {
|
||||
}
|
||||
// update InfoSchema before delete all the table data.
|
||||
clonedInfo := is.Clone()
|
||||
for _, info := range clonedInfo {
|
||||
if info.Name == ti.Schema {
|
||||
var newTableInfos []*model.TableInfo
|
||||
// append other tables.
|
||||
for _, tbInfo := range info.Tables {
|
||||
if tbInfo.Name.L != ti.Name.L {
|
||||
newTableInfos = append(newTableInfos, tbInfo)
|
||||
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
for _, info := range clonedInfo {
|
||||
if info.Name == ti.Schema {
|
||||
var newTableInfos []*model.TableInfo
|
||||
// append other tables.
|
||||
for _, tbInfo := range info.Tables {
|
||||
if tbInfo.Name.L != ti.Name.L {
|
||||
newTableInfos = append(newTableInfos, tbInfo)
|
||||
}
|
||||
}
|
||||
info.Tables = newTableInfos
|
||||
err = d.writeSchemaInfo(info, txn)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
info.Tables = newTableInfos
|
||||
err = d.writeSchemaInfo(info)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
d.infoHandle.Set(clonedInfo)
|
||||
return nil
|
||||
})
|
||||
d.infoHandle.Set(clonedInfo, is.SchemaMetaVersion()+1)
|
||||
err = d.deleteTableData(ctx, tb)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -581,7 +620,15 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde
|
||||
}
|
||||
|
||||
// update InfoSchema
|
||||
return d.updateInfoSchema(ctx, ti.Schema, tbInfo)
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = d.updateInfoSchema(ti.Schema, tbInfo, txn)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (d *ddl) buildIndex(ctx context.Context, t table.Table, idxInfo *model.IndexInfo, unique bool) error {
|
||||
@ -646,32 +693,28 @@ func (d *ddl) DropIndex(ctx context.Context, schema, tableName, indexNmae model.
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ddl) writeSchemaInfo(info *model.DBInfo) error {
|
||||
func (d *ddl) writeSchemaInfo(info *model.DBInfo, txn kv.Transaction) error {
|
||||
var b []byte
|
||||
b, err := json.Marshal(info)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := d.verifySchemaMetaVersion(txn)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
key := []byte(meta.DBMetaKey(info.ID))
|
||||
if err := txn.LockKeys(meta.SchemaMetaVersion, key); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return txn.Set(key, b)
|
||||
})
|
||||
log.Warn("save schema", string(b))
|
||||
if d.onDDLChange != nil {
|
||||
err = d.onDDLChange(err)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
key := []byte(meta.DBMetaKey(info.ID))
|
||||
if err := txn.LockKeys(key); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
txn.Set(key, b)
|
||||
log.Warn("save schema", string(b))
|
||||
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *model.TableInfo) error {
|
||||
clonedInfo := d.GetInformationSchema().Clone()
|
||||
func (d *ddl) updateInfoSchema(schema model.CIStr, tbInfo *model.TableInfo, txn kv.Transaction) error {
|
||||
is := d.GetInformationSchema()
|
||||
clonedInfo := is.Clone()
|
||||
for _, info := range clonedInfo {
|
||||
if info.Name == schema {
|
||||
var match bool
|
||||
@ -684,12 +727,12 @@ func (d *ddl) updateInfoSchema(ctx context.Context, schema model.CIStr, tbInfo *
|
||||
if !match {
|
||||
info.Tables = append(info.Tables, tbInfo)
|
||||
}
|
||||
err := d.writeSchemaInfo(info)
|
||||
err := d.writeSchemaInfo(info, txn)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
d.infoHandle.Set(clonedInfo)
|
||||
d.infoHandle.Set(clonedInfo, is.SchemaMetaVersion()+1)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -48,13 +48,12 @@ func (do *Domain) loadInfoSchema(txn kv.Transaction) (err error) {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
schemaMetaVersion, err := txn.GetInt64(meta.SchemaMetaVersion)
|
||||
schemaMetaVersion, err := txn.GetInt64(meta.SchemaMetaVersionKey)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
log.Info("loadInfoSchema %d", schemaMetaVersion)
|
||||
do.infoHandle.SetSchemaMetaVersion(schemaMetaVersion)
|
||||
do.infoHandle.Set(schemas)
|
||||
do.infoHandle.Set(schemas, schemaMetaVersion)
|
||||
return
|
||||
}
|
||||
|
||||
@ -83,9 +82,6 @@ func (do *Domain) onDDLChange(err error) error {
|
||||
}
|
||||
|
||||
func (do *Domain) reload() error {
|
||||
infoHandle := infoschema.NewHandle(do.store)
|
||||
do.infoHandle = infoHandle
|
||||
do.ddl = ddl.NewDDL(do.store, infoHandle, do.onDDLChange)
|
||||
err := kv.RunInNewTxn(do.store, false, do.loadInfoSchema)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -96,6 +92,8 @@ func NewDomain(store kv.Storage) (d *Domain, err error) {
|
||||
store: store,
|
||||
}
|
||||
|
||||
d.infoHandle = infoschema.NewHandle(d.store)
|
||||
d.ddl = ddl.NewDDL(d.store, d.infoHandle, d.onDDLChange)
|
||||
err = d.reload()
|
||||
return d, errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -45,6 +45,7 @@ type InfoSchema interface {
|
||||
AllSchemas() []*model.DBInfo
|
||||
Clone() (result []*model.DBInfo)
|
||||
SchemaTables(schema model.CIStr) []table.Table
|
||||
SchemaMetaVersion() int64
|
||||
// TODO: add more methods to retrieve tables and columns.
|
||||
}
|
||||
|
||||
@ -62,6 +63,9 @@ type infoSchema struct {
|
||||
columns map[int64]*model.ColumnInfo
|
||||
indices map[indexName]*model.IndexInfo
|
||||
columnIndices map[int64][]*model.IndexInfo
|
||||
|
||||
// We should to check version when change schema
|
||||
schemaMetaVersion int64
|
||||
}
|
||||
|
||||
type tableName struct {
|
||||
@ -185,8 +189,6 @@ func (is *infoSchema) Clone() (result []*model.DBInfo) {
|
||||
type Handle struct {
|
||||
value atomic.Value
|
||||
store kv.Storage
|
||||
// We should to check version when change schema
|
||||
schemaMetaVersion int64
|
||||
}
|
||||
|
||||
// NewHandle creates a new Handle.
|
||||
@ -196,27 +198,18 @@ func NewHandle(store kv.Storage) *Handle {
|
||||
}
|
||||
}
|
||||
|
||||
// SetSchemaMetaVersion set schema meta version
|
||||
func (h *Handle) SetSchemaMetaVersion(ver int64) {
|
||||
h.schemaMetaVersion = ver
|
||||
}
|
||||
|
||||
// SchemaMetaVersion return version of schema meta
|
||||
func (h *Handle) SchemaMetaVersion() int64 {
|
||||
return h.schemaMetaVersion
|
||||
}
|
||||
|
||||
// Set sets DBInfo to information schema.
|
||||
func (h *Handle) Set(newInfo []*model.DBInfo) {
|
||||
func (h *Handle) Set(newInfo []*model.DBInfo, schemaMetaVersion int64) {
|
||||
info := &infoSchema{
|
||||
schemaNameToID: map[string]int64{},
|
||||
tableNameToID: map[tableName]int64{},
|
||||
columnNameToID: map[columnName]int64{},
|
||||
schemas: map[int64]*model.DBInfo{},
|
||||
tables: map[int64]table.Table{},
|
||||
columns: map[int64]*model.ColumnInfo{},
|
||||
indices: map[indexName]*model.IndexInfo{},
|
||||
columnIndices: map[int64][]*model.IndexInfo{},
|
||||
schemaNameToID: map[string]int64{},
|
||||
tableNameToID: map[tableName]int64{},
|
||||
columnNameToID: map[columnName]int64{},
|
||||
schemas: map[int64]*model.DBInfo{},
|
||||
tables: map[int64]table.Table{},
|
||||
columns: map[int64]*model.ColumnInfo{},
|
||||
indices: map[indexName]*model.IndexInfo{},
|
||||
columnIndices: map[int64][]*model.IndexInfo{},
|
||||
schemaMetaVersion: schemaMetaVersion,
|
||||
}
|
||||
for _, di := range newInfo {
|
||||
info.schemas[di.ID] = di
|
||||
|
||||
@ -31,7 +31,7 @@ const (
|
||||
var (
|
||||
nextGlobalIDPrefix = []byte("mNextGlobalID")
|
||||
// SchemaMetaVersion is used as lock for changing schema
|
||||
SchemaMetaVersion = []byte("mSchemaVersion")
|
||||
SchemaMetaVersionKey = []byte("mSchemaVersion")
|
||||
)
|
||||
|
||||
// GenID adds step to the value for key and returns the sum.
|
||||
|
||||
Reference in New Issue
Block a user