From afdc869ef24e279c8f92750310085e8bd71ea8d3 Mon Sep 17 00:00:00 2001 From: siddontang Date: Mon, 2 Nov 2015 22:02:03 +0800 Subject: [PATCH] ddl: batch to add index. --- ddl/index.go | 99 ++++++++++++++++++++++++++++++++--------------- ddl/index_test.go | 17 ++++++-- 2 files changed, 81 insertions(+), 35 deletions(-) diff --git a/ddl/index.go b/ddl/index.go index 8d82ef859d..3e51a8e7f5 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -369,6 +369,8 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo return vals, nil } +const maxBatchSize = 1024 + // How to add index in reorgnization state? // 1, Generate a snapshot with special version. // 2, Traverse the snapshot, get every row in the table. @@ -376,22 +378,42 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo // 4, If not deleted, check whether index has existed, if existed, skip to next row. // 5, If index doesn't exist, create the index and then continue to handle next row. func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error { + seekHandle := int64(0) + for { + handles, err := d.getSnapshotRows(t, version, seekHandle) + if err != nil { + return errors.Trace(err) + } else if len(handles) == 0 { + return nil + } + + seekHandle = handles[len(handles)-1] + 1 + // TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle. + + err = d.backfillTableIndex(t, indexInfo, handles) + if err != nil { + return errors.Trace(err) + } + } +} + +func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ([]int64, error) { ver := kv.Version{Ver: version} snap, err := d.store.GetSnapshot(ver) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } defer snap.MvccRelease() - firstKey := t.FirstKey() + firstKey := t.RecordKey(seekHandle, nil) prefix := []byte(t.KeyPrefix()) it := snap.NewMvccIterator(kv.EncodeKey([]byte(firstKey)), ver) defer it.Close() - kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique) + handles := make([]int64, 0, maxBatchSize) for it.Valid() { key := kv.DecodeKey([]byte(it.Key())) @@ -402,17 +424,39 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u var handle int64 handle, err = util.DecodeHandleFromRowKey(string(key)) if err != nil { - return errors.Trace(err) + return nil, errors.Trace(err) } - log.Info("building index...", handle) + rk := kv.EncodeKey(t.RecordKey(handle, nil)) - // the first key in one row is the lock. - lock := t.RecordKey(handle, nil) - err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + handles = append(handles, handle) + if len(handles) == maxBatchSize { + seekHandle = handle + 1 + break + } + + it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk)) + if errors2.ErrorEqual(err, kv.ErrNotExist) { + break + } else if err != nil { + return nil, errors.Trace(err) + } + } + + snap.MvccRelease() + + return handles, nil +} + +func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64) error { + kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique) + + for _, handle := range handles { + log.Error("building index...", handle, handles) + + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { // first check row exists - var exist bool - exist, err = checkRowExist(txn, t, handle) + exist, err := checkRowExist(txn, t, handle) if err != nil { return errors.Trace(err) } else if !exist { @@ -436,21 +480,18 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u // mean we haven't already added this index. // lock row first - err = txn.LockKeys(lock) + err = txn.LockKeys(t.RecordKey(handle, nil)) if err != nil { return errors.Trace(err) } // create the index. + log.Errorf("create index %d", handle) err = kvX.Create(txn, vals, handle) return errors.Trace(err) }) - rk := kv.EncodeKey(lock) - it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk)) - if errors2.ErrorEqual(err, kv.ErrNotExist) { - break - } else if err != nil { + if err != nil { return errors.Trace(err) } } @@ -459,22 +500,18 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u } func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error { - ctx := d.newReorgContext() - txn, err := ctx.GetTxn(true) - - if err != nil { - return errors.Trace(err) - } - - // Remove indices. - for _, v := range t.Indices() { - if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L { - if err = v.X.Drop(txn); err != nil { - ctx.FinishTxn(true) - return errors.Trace(err) + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + // Remove indices. + for _, v := range t.Indices() { + if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L { + if err := v.X.Drop(txn); err != nil { + return errors.Trace(err) + } } } - } - return errors.Trace(ctx.FinishTxn(false)) + return nil + }) + + return errors.Trace(err) } diff --git a/ddl/index_test.go b/ddl/index_test.go index 07cb64c04b..322ea9478f 100644 --- a/ddl/index_test.go +++ b/ddl/index_test.go @@ -90,7 +90,8 @@ func (s *testIndexSuite) TestIndex(c *C) { t := testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID) - for i := 0; i < 10; i++ { + num := 10 + for i := 0; i < num; i++ { _, err = t.AddRecord(ctx, []interface{}{i, i, i}) c.Assert(err, IsNil) } @@ -112,10 +113,10 @@ func (s *testIndexSuite) TestIndex(c *C) { index := t.FindIndexByColName("c1") c.Assert(index, NotNil) - h, err := t.AddRecord(ctx, []interface{}{11, 11, 11}) + h, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1}) c.Assert(err, IsNil) - h1, err := t.AddRecord(ctx, []interface{}{11, 11, 11}) + h1, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1}) c.Assert(err, NotNil) c.Assert(h, Equals, h1) @@ -176,6 +177,7 @@ func (s *testIndexSuite) TestIndexWait(c *C) { ticker := time.NewTicker(d.lease) done := make(chan *model.Job, 1) + ctx.FinishTxn(false) go func() { done <- s.testCreateIndex(c, ctx, d, tblInfo, true, "c1_uni", "c1") }() @@ -227,6 +229,9 @@ LOOP: c.Assert(exist, IsTrue) } + err = ctx.FinishTxn(false) + c.Assert(err, IsNil) + d.start() } } @@ -234,6 +239,7 @@ LOOP: t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) c.Assert(t.FindIndexByColName("c1"), NotNil) + ctx.FinishTxn(false) go func() { done <- s.testDropIndex(c, ctx, d, tblInfo, "c1_uni") }() @@ -284,6 +290,9 @@ LOOP1: c.Assert(exist, IsTrue) } + err = ctx.FinishTxn(false) + c.Assert(err, IsNil) + d.start() } } @@ -293,5 +302,5 @@ LOOP1: } func init() { - log.SetLevelByString("warn") + log.SetLevelByString("error") }