ddl: cleanup code

This commit is contained in:
siddontang
2015-10-22 10:27:31 +08:00
parent 791f0d6f78
commit e8222380b3

View File

@ -368,7 +368,8 @@ func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constrai
func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*coldef.ColumnDef, constraints []*coldef.TableConstraint) (err error) {
is := d.GetInformationSchema()
if !is.SchemaExists(ident.Schema) {
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(qerror.ErrDatabaseNotExist)
}
if is.TableExists(ident.Schema, ident.Name) {
@ -400,7 +401,13 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
return errors.Trace(err)
}
err = d.updateInfoSchema(ident.Schema, tbInfo, txn, true)
var b []byte
b, err = json.Marshal(tbInfo)
if err != nil {
return errors.Trace(err)
}
err = txn.CreateTable(schema.ID, tbInfo.ID, b)
return errors.Trace(err)
})
@ -413,9 +420,12 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterSpecification) (err error) {
// Get database and table.
is := d.GetInformationSchema()
if !is.SchemaExists(ident.Schema) {
schema, ok := is.SchemaByName(ident.Schema)
if !ok {
return errors.Trace(qerror.ErrDatabaseNotExist)
}
tbl, err := is.TableByName(ident.Schema, ident.Name)
if err != nil {
return errors.Trace(err)
@ -423,7 +433,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
for _, spec := range specs {
switch spec.Action {
case AlterAddColumn:
if err := d.addColumn(ctx, ident.Schema, tbl, spec, is.SchemaMetaVersion()); err != nil {
if err := d.addColumn(ctx, schema, tbl, spec, is.SchemaMetaVersion()); err != nil {
return errors.Trace(err)
}
default:
@ -435,7 +445,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS
}
// Add a column into table
func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error {
func (d *ddl) addColumn(ctx context.Context, schema *model.DBInfo, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error {
// Find position
cols := tbl.Cols()
position := len(cols)
@ -497,7 +507,8 @@ func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table
if err != nil {
return errors.Trace(err)
}
err = d.updateInfoSchema(schema, tb.Meta(), txn, false)
err = d.updateTableInfo(schema, tb.Meta(), txn)
return errors.Trace(err)
})
if d.onDDLChange != nil {
@ -543,38 +554,24 @@ func updateOldRows(ctx context.Context, t *tables.Table, col *column.Col) error
// DropTable will proceed even if some table in the list does not exists.
func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) {
is := d.GetInformationSchema()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(qerror.ErrDatabaseNotExist)
}
tb, err := is.TableByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(err)
}
// update InfoSchema before delete all the table data.
clonedInfo := is.Clone()
err = d.meta.RunInNewTxn(false, func(txn *meta.TMeta) error {
err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion())
if err != nil {
return errors.Trace(err)
}
for _, info := range clonedInfo {
if info.Name == ti.Schema {
var newTableInfos []*model.TableInfo
// append other tables.
var tableID int64
for _, tbInfo := range info.Tables {
if tbInfo.Name.L != ti.Name.L {
newTableInfos = append(newTableInfos, tbInfo)
} else {
tableID = tbInfo.ID
}
}
info.Tables = newTableInfos
err = txn.DropTable(info.ID, tableID)
if err != nil {
return errors.Trace(err)
}
}
}
return nil
err = txn.DropTable(schema.ID, tb.Meta().ID)
return errors.Trace(err)
})
if d.onDDLChange != nil {
err = d.onDDLChange(err)
@ -610,6 +607,11 @@ func (d *ddl) deleteTableData(ctx context.Context, t table.Table) error {
func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, indexName model.CIStr, idxColNames []*coldef.IndexColName) error {
is := d.infoHandle.Get()
schema, ok := is.SchemaByName(ti.Schema)
if !ok {
return errors.Trace(qerror.ErrDatabaseNotExist)
}
t, err := is.TableByName(ti.Schema, ti.Name)
if err != nil {
return errors.Trace(err)
@ -664,7 +666,8 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde
if err != nil {
return errors.Trace(err)
}
err = d.updateInfoSchema(ti.Schema, tbInfo, txn, false)
err = d.updateTableInfo(schema, tbInfo, txn)
return errors.Trace(err)
})
if d.onDDLChange != nil {
@ -735,37 +738,11 @@ func (d *ddl) DropIndex(ctx context.Context, schema, tableName, indexNmae model.
return nil
}
func (d *ddl) updateInfoSchema(schema model.CIStr, tbInfo *model.TableInfo, txn *meta.TMeta, create bool) error {
is := d.GetInformationSchema()
clonedInfo := is.Clone()
for _, info := range clonedInfo {
if info.Name == schema {
var match bool
for i := range info.Tables {
if info.Tables[i].Name == tbInfo.Name {
info.Tables[i] = tbInfo
match = true
}
}
if !match {
info.Tables = append(info.Tables, tbInfo)
}
b, err := json.Marshal(tbInfo)
if err != nil {
return errors.Trace(err)
}
if create {
err = txn.CreateTable(info.ID, tbInfo.ID, b)
} else {
err = txn.UpdateTable(info.ID, tbInfo.ID, b)
}
if err != nil {
return errors.Trace(err)
}
}
func (d *ddl) updateTableInfo(schema *model.DBInfo, tbInfo *model.TableInfo, txn *meta.TMeta) error {
b, err := json.Marshal(tbInfo)
if err != nil {
return errors.Trace(err)
}
return nil
return txn.UpdateTable(schema.ID, tbInfo.ID, b)
}