From 5fd01e9745c339a29c37e032c4c73ee5f2bd98d3 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 3 Nov 2015 12:17:47 +0800 Subject: [PATCH 1/3] *: drop schema/table/index supports batch delete. --- ddl/index.go | 16 ++-------------- ddl/reorg.go | 47 ++++++++++++++++++++++++++++++++++++++++++++++- ddl/schema.go | 7 ++----- ddl/table.go | 12 +++++++----- kv/index_iter.go | 5 +++-- 5 files changed, 60 insertions(+), 27 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index d7b5a646d8..0670a7f3cc 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -452,7 +452,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique) for _, handle := range handles { - log.Error("building index...", handle, handles) + log.Debug("building index...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { // first check row exists @@ -499,18 +499,6 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand } func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error { - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - // Remove indices. - for _, v := range t.Indices() { - if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L { - if err := v.X.Drop(txn); err != nil { - return errors.Trace(err) - } - } - } - - return nil - }) - + err := d.delKeysWithPrefix(kv.GenIndexPrefix(t.IndexPrefix(), indexInfo.Name.L)) return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index 106712de7c..e17237b7ab 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -15,6 +15,7 @@ package ddl import ( "fmt" + "strings" "time" "github.com/juju/errors" @@ -120,5 +121,49 @@ func (d *ddl) runReorgJob(f func() error) error { // we return errWaitReorgTimeout here too, so that outer loop will break. return errWaitReorgTimeout } - +} + +func (d *ddl) delKeysWithPrefix(prefix string) error { + keys := make([]string, maxBatchSize) + + for { + keys := keys[0:0] + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + iter, err := txn.Seek([]byte(prefix)) + if err != nil { + return errors.Trace(err) + } + + defer iter.Close() + for i := 0; i < maxBatchSize; i++ { + if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) { + keys = append(keys, iter.Key()) + err = iter.Next() + if err != nil { + return errors.Trace(err) + } + } else { + break + } + } + + for _, key := range keys { + err := txn.Delete([]byte(key)) + if err != nil { + return errors.Trace(err) + } + } + + return nil + }) + + if err != nil { + return errors.Trace(err) + } + + // delete no keys, return. + if len(keys) == 0 { + return nil + } + } } diff --git a/ddl/schema.go b/ddl/schema.go index c0417b4718..d0e01e60eb 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -160,16 +160,13 @@ func (d *ddl) onSchemaDrop(t *meta.Meta, job *model.Job) error { } func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) error { - ctx := d.newReorgContext() - for _, tblInfo := range tables { alloc := autoid.NewAllocator(d.store, dbInfo.ID) t := table.TableFromMeta(alloc, tblInfo) - err := t.Truncate(ctx) + err := d.dropTableData(t) if err != nil { - ctx.FinishTxn(true) return errors.Trace(err) } } - return errors.Trace(ctx.FinishTxn(false)) + return nil } diff --git a/ddl/table.go b/ddl/table.go index 7149e10522..038fbe4ba6 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -171,12 +171,14 @@ func (d *ddl) getTable(t *meta.Meta, schemaID int64, tblInfo *model.TableInfo) ( } func (d *ddl) dropTableData(t table.Table) error { - ctx := d.newReorgContext() - - if err := t.Truncate(ctx); err != nil { - ctx.FinishTxn(true) + // delete table data + err := d.delKeysWithPrefix(t.KeyPrefix()) + if err != nil { return errors.Trace(err) } - return errors.Trace(ctx.FinishTxn(false)) + // delete table index + err = d.delKeysWithPrefix(t.IndexPrefix()) + + return errors.Trace(err) } diff --git a/kv/index_iter.go b/kv/index_iter.go index 8515a303bc..73a326880a 100644 --- a/kv/index_iter.go +++ b/kv/index_iter.go @@ -101,7 +101,8 @@ type kvIndex struct { prefix string } -func genIndexPrefix(indexPrefix, indexName string) string { +// GenIndexPrefix generates the index prefix. +func GenIndexPrefix(indexPrefix, indexName string) string { // Use EncodeBytes to guarantee generating different index prefix. // e.g, two indices c1 and c with index prefix p, if no EncodeBytes, // the index format looks p_c and p_c1, if c has an index value which the first encoded byte is '1', @@ -116,7 +117,7 @@ func NewKVIndex(indexPrefix, indexName string, unique bool) Index { return &kvIndex{ indexName: indexName, unique: unique, - prefix: genIndexPrefix(indexPrefix, indexName), + prefix: GenIndexPrefix(indexPrefix, indexName), } } From aeb57cd7bdcc991db96e9c4fb3f8a71ce2a29b52 Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 4 Nov 2015 11:09:35 +0800 Subject: [PATCH 2/3] *: add ReOrgHandle to support resume reorg job. --- ddl/ddl.go | 4 ++++ ddl/index.go | 17 +++++++++++++---- ddl/reorg.go | 1 - model/ddl.go | 5 +++++ 4 files changed, 22 insertions(+), 5 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index c484c7b23e..3c5d41b3c2 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -76,6 +76,10 @@ type ddl struct { // use a persistent job list. reOrgDoneCh chan error + // reOrgHandle is used for adding data reorgnization, after every batch, + // we will update for later job update. + reOrgHandle int64 + quitCh chan struct{} wait sync.WaitGroup } diff --git a/ddl/index.go b/ddl/index.go index 0670a7f3cc..cfdc2aee02 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -16,6 +16,7 @@ package ddl import ( "bytes" "strings" + "sync/atomic" "github.com/juju/errors" "github.com/ngaut/log" @@ -193,6 +194,9 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { indexInfo.State = model.StateReorgnization // initialize SnapshotVer to 0 for later reorgnization check. job.SnapshotVer = 0 + // initialize reorg handle to 0 + job.ReOrgHandle = 0 + atomic.StoreInt64(&d.reOrgHandle, 0) err = t.UpdateTable(schemaID, tblInfo) return errors.Trace(err) case model.StateReorgnization: @@ -215,9 +219,13 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { } err = d.runReorgJob(func() error { - return d.addTableIndex(tbl, indexInfo, job.SnapshotVer) + return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReOrgHandle) }) + // addTableIndex updates ReOrgHandle after one batch. + // so we update the job ReOrgHandle here. + job.ReOrgHandle = atomic.LoadInt64(&d.reOrgHandle) + if errors2.ErrorEqual(err, errWaitReorgTimeout) { // if timeout, we should return, check for the owner and re-wait job done. return nil @@ -377,8 +385,7 @@ const maxBatchSize = 1024 // 3, For one row, if the row has been already deleted, skip to next row. // 4, If not deleted, check whether index has existed, if existed, skip to next row. // 5, If index doesn't exist, create the index and then continue to handle next row. -func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error { - seekHandle := int64(0) +func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64, seekHandle int64) error { for { handles, err := d.getSnapshotRows(t, version, seekHandle) if err != nil { @@ -388,12 +395,14 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u } seekHandle = handles[len(handles)-1] + 1 - // TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle. err = d.backfillTableIndex(t, indexInfo, handles) if err != nil { return errors.Trace(err) } + + // update reOrgHandle here after every successful batch. + atomic.StoreInt64(&d.reOrgHandle, seekHandle) } } diff --git a/ddl/reorg.go b/ddl/reorg.go index e17237b7ab..19c6e0f4b3 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -98,7 +98,6 @@ const waitReorgTimeout = 10 * time.Second var errWaitReorgTimeout = errors.New("wait for reorgnization timeout") func (d *ddl) runReorgJob(f func() error) error { - // TODO use persistent reorgnization job list. if d.reOrgDoneCh == nil { // start a reorgnization job d.reOrgDoneCh = make(chan error, 1) diff --git a/model/ddl.go b/model/ddl.go index 33ee20554f..9f1f37e8ab 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -80,6 +80,11 @@ type Job struct { // snapshot version for this job. SnapshotVer uint64 `json:"snapshot_ver"` LastUpdateTS int64 `json:"last_update_ts"` + // For reorgnization adding data job like add index, add column, + // we will traverse the huge snapshot and add the data in batches. + // After some successful batches, we will update the ReorgHandle for + // later resuming if currect server crashed. + ReOrgHandle int64 `json:"reorg_handle"` } // Encode encodes job with json format. From 8076a0cdb9191be28bc39e370862d5de1f656a8c Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 4 Nov 2015 11:50:09 +0800 Subject: [PATCH 3/3] ddl: update test schema name for single test. --- ddl/ddl_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 854b2626da..2f55dd4d3d 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -52,7 +52,7 @@ func (ts *testSuite) SetUpSuite(c *C) { func (ts *testSuite) TestT(c *C) { se, _ := tidb.CreateSession(ts.store) ctx := se.(context.Context) - schemaName := model.NewCIStr("test") + schemaName := model.NewCIStr("test_ddl") tblName := model.NewCIStr("t") tbIdent := table.Ident{ Schema: schemaName, @@ -170,15 +170,18 @@ func (ts *testSuite) TestT(c *C) { func (ts *testSuite) TestConstraintNames(c *C) { se, _ := tidb.CreateSession(ts.store) ctx := se.(context.Context) - schemaName := model.NewCIStr("test") + schemaName := model.NewCIStr("test_constraint") tblName := model.NewCIStr("t") tbIdent := table.Ident{ Schema: schemaName, Name: tblName, } + err := sessionctx.GetDomain(ctx).DDL().CreateSchema(ctx, tbIdent.Schema) + c.Assert(err, IsNil) + tbStmt := statement("create table t (a int, b int, index a (a, b), index a (a))").(*stmts.CreateTableStmt) - err := sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints) + err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints) c.Assert(err, NotNil) tbStmt = statement("create table t (a int, b int, index A (a, b), index (a))").(*stmts.CreateTableStmt)