diff --git a/ddl/column.go b/ddl/column.go index cbcf435456..ac41a619ff 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -52,7 +52,7 @@ func (d *ddl) addColumn(tblInfo *model.TableInfo, spec *AlterSpecification) (*mo if spec.Position.Type == ColumnPositionFirst { position = 0 } else if spec.Position.Type == ColumnPositionAfter { - c := findCol(tblInfo.Columns, spec.Position.RelativeColumn) + c := findCol(cols, spec.Position.RelativeColumn) if c == nil { return nil, 0, errors.Errorf("No such column: %v", spec.Column.Name) } diff --git a/ddl/ddl.go b/ddl/ddl.go index bdadbccda5..c5a35d0733 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -74,6 +74,10 @@ type ddl struct { // use a persistent job list. reOrgDoneCh chan error + // reOrgHandle is used for adding data reorgnization, after every batch, + // we will update for later job update. + reOrgHandle int64 + quitCh chan struct{} wait sync.WaitGroup } diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 8fdf62c83e..7eb60450a4 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -49,10 +49,10 @@ func (ts *testSuite) SetUpSuite(c *C) { ts.store = store } -func (ts *testSuite) TestT(c *C) { +func (ts *testSuite) TestDDL(c *C) { se, _ := tidb.CreateSession(ts.store) ctx := se.(context.Context) - schemaName := model.NewCIStr("test") + schemaName := model.NewCIStr("test_ddl") tblName := model.NewCIStr("t") tbIdent := table.Ident{ Schema: schemaName, @@ -74,27 +74,6 @@ func (ts *testSuite) TestT(c *C) { err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints) c.Assert(errors2.ErrorEqual(err, ddl.ErrExists), IsTrue) - // Test index. - idxStmt := statement("CREATE INDEX idx_c ON t (c)").(*stmts.CreateIndexStmt) - idxName := model.NewCIStr(idxStmt.IndexName) - err = sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, tbIdent, idxStmt.Unique, idxName, idxStmt.IndexColNames) - c.Assert(err, IsNil) - tbs := sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema) - c.Assert(tbs, HasLen, 1) - err = sessionctx.GetDomain(ctx).DDL().DropIndex(ctx, tbIdent, idxName) - c.Assert(err, IsNil) - err = sessionctx.GetDomain(ctx).DDL().DropTable(ctx, tbIdent) - c.Assert(err, IsNil) - tbs = sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema) - c.Assert(tbs, HasLen, 0) - - err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, noExist) - c.Assert(errors2.ErrorEqual(err, ddl.ErrNotExists), IsTrue) - err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, tbIdent.Schema) - c.Assert(err, IsNil) - - // Test column. - c.Skip("Not completely support column ddl") tbIdent2 := tbIdent tbIdent2.Name = model.NewCIStr("t2") tbStmt = statement("create table t2 (a int unique not null)").(*stmts.CreateTableStmt) @@ -110,6 +89,9 @@ func (ts *testSuite) TestT(c *C) { alterStmt := statement(`alter table t2 add b enum("bb") first`).(*stmts.AlterTableStmt) sessionctx.GetDomain(ctx).DDL().AlterTable(ctx, tbIdent2, alterStmt.Specs) c.Assert(alterStmt.Specs[0].String(), Not(Equals), "") + tb, err = sessionctx.GetDomain(ctx).InfoSchema().TableByName(tbIdent2.Schema, tbIdent2.Name) + c.Assert(err, IsNil) + c.Assert(tb, NotNil) cols, err := tb.Row(ctx, rid0) c.Assert(err, IsNil) c.Assert(len(cols), Equals, 2) @@ -168,20 +150,41 @@ func (ts *testSuite) TestT(c *C) { alterStmt = statement("alter table t add column bb int after b").(*stmts.AlterTableStmt) err = sessionctx.GetDomain(ctx).DDL().AlterTable(ctx, tbIdent, alterStmt.Specs) c.Assert(err, NotNil) + + idxStmt := statement("CREATE INDEX idx_c ON t (c)").(*stmts.CreateIndexStmt) + idxName := model.NewCIStr(idxStmt.IndexName) + err = sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, tbIdent, idxStmt.Unique, idxName, idxStmt.IndexColNames) + c.Assert(err, IsNil) + tbs := sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema) + c.Assert(len(tbs), Equals, 2) + err = sessionctx.GetDomain(ctx).DDL().DropIndex(ctx, tbIdent, idxName) + c.Assert(err, IsNil) + err = sessionctx.GetDomain(ctx).DDL().DropTable(ctx, tbIdent) + c.Assert(err, IsNil) + tbs = sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema) + c.Assert(len(tbs), Equals, 1) + + err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, noExist) + c.Assert(errors2.ErrorEqual(err, ddl.ErrNotExists), IsTrue) + err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, tbIdent.Schema) + c.Assert(err, IsNil) } func (ts *testSuite) TestConstraintNames(c *C) { se, _ := tidb.CreateSession(ts.store) ctx := se.(context.Context) - schemaName := model.NewCIStr("test") + schemaName := model.NewCIStr("test_constraint") tblName := model.NewCIStr("t") tbIdent := table.Ident{ Schema: schemaName, Name: tblName, } + err := sessionctx.GetDomain(ctx).DDL().CreateSchema(ctx, tbIdent.Schema) + c.Assert(err, IsNil) + tbStmt := statement("create table t (a int, b int, index a (a, b), index a (a))").(*stmts.CreateTableStmt) - err := sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints) + err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints) c.Assert(err, NotNil) tbStmt = statement("create table t (a int, b int, index A (a, b), index (a))").(*stmts.CreateTableStmt) diff --git a/ddl/index.go b/ddl/index.go index f207685725..d6092deb1f 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -15,6 +15,7 @@ package ddl import ( "bytes" + "sync/atomic" "github.com/juju/errors" "github.com/ngaut/log" @@ -159,6 +160,9 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { indexInfo.State = model.StateReorgnization // initialize SnapshotVer to 0 for later reorgnization check. job.SnapshotVer = 0 + // initialize reorg handle to 0 + job.ReOrgHandle = 0 + atomic.StoreInt64(&d.reOrgHandle, 0) err = t.UpdateTable(schemaID, tblInfo) return errors.Trace(err) case model.StateReorgnization: @@ -181,9 +185,13 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { } err = d.runReorgJob(func() error { - return d.addTableIndex(tbl, indexInfo, job.SnapshotVer) + return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReOrgHandle) }) + // addTableIndex updates ReOrgHandle after one batch. + // so we update the job ReOrgHandle here. + job.ReOrgHandle = atomic.LoadInt64(&d.reOrgHandle) + if errors2.ErrorEqual(err, errWaitReorgTimeout) { // if timeout, we should return, check for the owner and re-wait job done. return nil @@ -343,8 +351,7 @@ const maxBatchSize = 1024 // 3, For one row, if the row has been already deleted, skip to next row. // 4, If not deleted, check whether index has existed, if existed, skip to next row. // 5, If index doesn't exist, create the index and then continue to handle next row. -func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error { - seekHandle := int64(0) +func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64, seekHandle int64) error { for { handles, err := d.getSnapshotRows(t, version, seekHandle) if err != nil { @@ -354,12 +361,14 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u } seekHandle = handles[len(handles)-1] + 1 - // TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle. err = d.backfillTableIndex(t, indexInfo, handles) if err != nil { return errors.Trace(err) } + + // update reOrgHandle here after every successful batch. + atomic.StoreInt64(&d.reOrgHandle, seekHandle) } } @@ -418,7 +427,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique) for _, handle := range handles { - log.Error("building index...", handle, handles) + log.Debug("building index...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { // first check row exists @@ -465,18 +474,6 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand } func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error { - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - // Remove indices. - for _, v := range t.Indices() { - if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L { - if err := v.X.Drop(txn); err != nil { - return errors.Trace(err) - } - } - } - - return nil - }) - + err := d.delKeysWithPrefix(kv.GenIndexPrefix(t.IndexPrefix(), indexInfo.Name.L)) return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index 106712de7c..19c6e0f4b3 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -15,6 +15,7 @@ package ddl import ( "fmt" + "strings" "time" "github.com/juju/errors" @@ -97,7 +98,6 @@ const waitReorgTimeout = 10 * time.Second var errWaitReorgTimeout = errors.New("wait for reorgnization timeout") func (d *ddl) runReorgJob(f func() error) error { - // TODO use persistent reorgnization job list. if d.reOrgDoneCh == nil { // start a reorgnization job d.reOrgDoneCh = make(chan error, 1) @@ -120,5 +120,49 @@ func (d *ddl) runReorgJob(f func() error) error { // we return errWaitReorgTimeout here too, so that outer loop will break. return errWaitReorgTimeout } - +} + +func (d *ddl) delKeysWithPrefix(prefix string) error { + keys := make([]string, maxBatchSize) + + for { + keys := keys[0:0] + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + iter, err := txn.Seek([]byte(prefix)) + if err != nil { + return errors.Trace(err) + } + + defer iter.Close() + for i := 0; i < maxBatchSize; i++ { + if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) { + keys = append(keys, iter.Key()) + err = iter.Next() + if err != nil { + return errors.Trace(err) + } + } else { + break + } + } + + for _, key := range keys { + err := txn.Delete([]byte(key)) + if err != nil { + return errors.Trace(err) + } + } + + return nil + }) + + if err != nil { + return errors.Trace(err) + } + + // delete no keys, return. + if len(keys) == 0 { + return nil + } + } } diff --git a/ddl/schema.go b/ddl/schema.go index c0417b4718..d0e01e60eb 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -160,16 +160,13 @@ func (d *ddl) onSchemaDrop(t *meta.Meta, job *model.Job) error { } func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) error { - ctx := d.newReorgContext() - for _, tblInfo := range tables { alloc := autoid.NewAllocator(d.store, dbInfo.ID) t := table.TableFromMeta(alloc, tblInfo) - err := t.Truncate(ctx) + err := d.dropTableData(t) if err != nil { - ctx.FinishTxn(true) return errors.Trace(err) } } - return errors.Trace(ctx.FinishTxn(false)) + return nil } diff --git a/ddl/table.go b/ddl/table.go index 86904fc623..3fe0d71338 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -193,12 +193,14 @@ func (d *ddl) getTableInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, erro } func (d *ddl) dropTableData(t table.Table) error { - ctx := d.newReorgContext() - - if err := t.Truncate(ctx); err != nil { - ctx.FinishTxn(true) + // delete table data + err := d.delKeysWithPrefix(t.KeyPrefix()) + if err != nil { return errors.Trace(err) } - return errors.Trace(ctx.FinishTxn(false)) + // delete table index + err = d.delKeysWithPrefix(t.IndexPrefix()) + + return errors.Trace(err) } diff --git a/kv/index_iter.go b/kv/index_iter.go index 8515a303bc..73a326880a 100644 --- a/kv/index_iter.go +++ b/kv/index_iter.go @@ -101,7 +101,8 @@ type kvIndex struct { prefix string } -func genIndexPrefix(indexPrefix, indexName string) string { +// GenIndexPrefix generates the index prefix. +func GenIndexPrefix(indexPrefix, indexName string) string { // Use EncodeBytes to guarantee generating different index prefix. // e.g, two indices c1 and c with index prefix p, if no EncodeBytes, // the index format looks p_c and p_c1, if c has an index value which the first encoded byte is '1', @@ -116,7 +117,7 @@ func NewKVIndex(indexPrefix, indexName string, unique bool) Index { return &kvIndex{ indexName: indexName, unique: unique, - prefix: genIndexPrefix(indexPrefix, indexName), + prefix: GenIndexPrefix(indexPrefix, indexName), } } diff --git a/model/ddl.go b/model/ddl.go index 33ee20554f..9f1f37e8ab 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -80,6 +80,11 @@ type Job struct { // snapshot version for this job. SnapshotVer uint64 `json:"snapshot_ver"` LastUpdateTS int64 `json:"last_update_ts"` + // For reorgnization adding data job like add index, add column, + // we will traverse the huge snapshot and add the data in batches. + // After some successful batches, we will update the ReorgHandle for + // later resuming if currect server crashed. + ReOrgHandle int64 `json:"reorg_handle"` } // Encode encodes job with json format.