diff --git a/ddl/ddl.go b/ddl/ddl.go index 01a2e76df0..aff01f4202 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -368,7 +368,8 @@ func (d *ddl) buildTableInfo(tableName model.CIStr, cols []*column.Col, constrai func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*coldef.ColumnDef, constraints []*coldef.TableConstraint) (err error) { is := d.GetInformationSchema() - if !is.SchemaExists(ident.Schema) { + schema, ok := is.SchemaByName(ident.Schema) + if !ok { return errors.Trace(qerror.ErrDatabaseNotExist) } if is.TableExists(ident.Schema, ident.Name) { @@ -400,7 +401,13 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col return errors.Trace(err) } - err = d.updateInfoSchema(ident.Schema, tbInfo, txn, true) + var b []byte + b, err = json.Marshal(tbInfo) + if err != nil { + return errors.Trace(err) + } + + err = txn.CreateTable(schema.ID, tbInfo.ID, b) return errors.Trace(err) }) @@ -413,9 +420,12 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterSpecification) (err error) { // Get database and table. is := d.GetInformationSchema() - if !is.SchemaExists(ident.Schema) { + + schema, ok := is.SchemaByName(ident.Schema) + if !ok { return errors.Trace(qerror.ErrDatabaseNotExist) } + tbl, err := is.TableByName(ident.Schema, ident.Name) if err != nil { return errors.Trace(err) @@ -423,7 +433,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS for _, spec := range specs { switch spec.Action { case AlterAddColumn: - if err := d.addColumn(ctx, ident.Schema, tbl, spec, is.SchemaMetaVersion()); err != nil { + if err := d.addColumn(ctx, schema, tbl, spec, is.SchemaMetaVersion()); err != nil { return errors.Trace(err) } default: @@ -435,7 +445,7 @@ func (d *ddl) AlterTable(ctx context.Context, ident table.Ident, specs []*AlterS } // Add a column into table -func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error { +func (d *ddl) addColumn(ctx context.Context, schema *model.DBInfo, tbl table.Table, spec *AlterSpecification, schemaMetaVersion int64) error { // Find position cols := tbl.Cols() position := len(cols) @@ -497,7 +507,8 @@ func (d *ddl) addColumn(ctx context.Context, schema model.CIStr, tbl table.Table if err != nil { return errors.Trace(err) } - err = d.updateInfoSchema(schema, tb.Meta(), txn, false) + + err = d.updateTableInfo(schema, tb.Meta(), txn) return errors.Trace(err) }) if d.onDDLChange != nil { @@ -543,38 +554,24 @@ func updateOldRows(ctx context.Context, t *tables.Table, col *column.Col) error // DropTable will proceed even if some table in the list does not exists. func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) { is := d.GetInformationSchema() + schema, ok := is.SchemaByName(ti.Schema) + if !ok { + return errors.Trace(qerror.ErrDatabaseNotExist) + } + tb, err := is.TableByName(ti.Schema, ti.Name) if err != nil { return errors.Trace(err) } - // update InfoSchema before delete all the table data. - clonedInfo := is.Clone() err = d.meta.RunInNewTxn(false, func(txn *meta.TMeta) error { err := d.verifySchemaMetaVersion(txn, is.SchemaMetaVersion()) if err != nil { return errors.Trace(err) } - for _, info := range clonedInfo { - if info.Name == ti.Schema { - var newTableInfos []*model.TableInfo - // append other tables. - var tableID int64 - for _, tbInfo := range info.Tables { - if tbInfo.Name.L != ti.Name.L { - newTableInfos = append(newTableInfos, tbInfo) - } else { - tableID = tbInfo.ID - } - } - info.Tables = newTableInfos - err = txn.DropTable(info.ID, tableID) - if err != nil { - return errors.Trace(err) - } - } - } - return nil + + err = txn.DropTable(schema.ID, tb.Meta().ID) + return errors.Trace(err) }) if d.onDDLChange != nil { err = d.onDDLChange(err) @@ -610,6 +607,11 @@ func (d *ddl) deleteTableData(ctx context.Context, t table.Table) error { func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, indexName model.CIStr, idxColNames []*coldef.IndexColName) error { is := d.infoHandle.Get() + schema, ok := is.SchemaByName(ti.Schema) + if !ok { + return errors.Trace(qerror.ErrDatabaseNotExist) + } + t, err := is.TableByName(ti.Schema, ti.Name) if err != nil { return errors.Trace(err) @@ -664,7 +666,8 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde if err != nil { return errors.Trace(err) } - err = d.updateInfoSchema(ti.Schema, tbInfo, txn, false) + + err = d.updateTableInfo(schema, tbInfo, txn) return errors.Trace(err) }) if d.onDDLChange != nil { @@ -735,37 +738,11 @@ func (d *ddl) DropIndex(ctx context.Context, schema, tableName, indexNmae model. return nil } -func (d *ddl) updateInfoSchema(schema model.CIStr, tbInfo *model.TableInfo, txn *meta.TMeta, create bool) error { - is := d.GetInformationSchema() - clonedInfo := is.Clone() - for _, info := range clonedInfo { - if info.Name == schema { - var match bool - for i := range info.Tables { - if info.Tables[i].Name == tbInfo.Name { - info.Tables[i] = tbInfo - match = true - } - } - if !match { - info.Tables = append(info.Tables, tbInfo) - } - - b, err := json.Marshal(tbInfo) - if err != nil { - return errors.Trace(err) - } - - if create { - err = txn.CreateTable(info.ID, tbInfo.ID, b) - } else { - err = txn.UpdateTable(info.ID, tbInfo.ID, b) - } - - if err != nil { - return errors.Trace(err) - } - } +func (d *ddl) updateTableInfo(schema *model.DBInfo, tbInfo *model.TableInfo, txn *meta.TMeta) error { + b, err := json.Marshal(tbInfo) + if err != nil { + return errors.Trace(err) } - return nil + + return txn.UpdateTable(schema.ID, tbInfo.ID, b) }