ddl: batch to add index.

This commit is contained in:
siddontang
2015-11-02 22:02:03 +08:00
parent 98160fc8ae
commit afdc869ef2
2 changed files with 81 additions and 35 deletions

View File

@ -369,6 +369,8 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo
return vals, nil
}
const maxBatchSize = 1024
// How to add index in reorgnization state?
// 1, Generate a snapshot with special version.
// 2, Traverse the snapshot, get every row in the table.
@ -376,22 +378,42 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo
// 4, If not deleted, check whether index has existed, if existed, skip to next row.
// 5, If index doesn't exist, create the index and then continue to handle next row.
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error {
seekHandle := int64(0)
for {
handles, err := d.getSnapshotRows(t, version, seekHandle)
if err != nil {
return errors.Trace(err)
} else if len(handles) == 0 {
return nil
}
seekHandle = handles[len(handles)-1] + 1
// TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle.
err = d.backfillTableIndex(t, indexInfo, handles)
if err != nil {
return errors.Trace(err)
}
}
}
func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ([]int64, error) {
ver := kv.Version{Ver: version}
snap, err := d.store.GetSnapshot(ver)
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
defer snap.MvccRelease()
firstKey := t.FirstKey()
firstKey := t.RecordKey(seekHandle, nil)
prefix := []byte(t.KeyPrefix())
it := snap.NewMvccIterator(kv.EncodeKey([]byte(firstKey)), ver)
defer it.Close()
kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique)
handles := make([]int64, 0, maxBatchSize)
for it.Valid() {
key := kv.DecodeKey([]byte(it.Key()))
@ -402,17 +424,39 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
var handle int64
handle, err = util.DecodeHandleFromRowKey(string(key))
if err != nil {
return errors.Trace(err)
return nil, errors.Trace(err)
}
log.Info("building index...", handle)
rk := kv.EncodeKey(t.RecordKey(handle, nil))
// the first key in one row is the lock.
lock := t.RecordKey(handle, nil)
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
handles = append(handles, handle)
if len(handles) == maxBatchSize {
seekHandle = handle + 1
break
}
it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if errors2.ErrorEqual(err, kv.ErrNotExist) {
break
} else if err != nil {
return nil, errors.Trace(err)
}
}
snap.MvccRelease()
return handles, nil
}
func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64) error {
kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique)
for _, handle := range handles {
log.Error("building index...", handle, handles)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
// first check row exists
var exist bool
exist, err = checkRowExist(txn, t, handle)
exist, err := checkRowExist(txn, t, handle)
if err != nil {
return errors.Trace(err)
} else if !exist {
@ -436,21 +480,18 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
// mean we haven't already added this index.
// lock row first
err = txn.LockKeys(lock)
err = txn.LockKeys(t.RecordKey(handle, nil))
if err != nil {
return errors.Trace(err)
}
// create the index.
log.Errorf("create index %d", handle)
err = kvX.Create(txn, vals, handle)
return errors.Trace(err)
})
rk := kv.EncodeKey(lock)
it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if errors2.ErrorEqual(err, kv.ErrNotExist) {
break
} else if err != nil {
if err != nil {
return errors.Trace(err)
}
}
@ -459,22 +500,18 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
}
func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error {
ctx := d.newReorgContext()
txn, err := ctx.GetTxn(true)
if err != nil {
return errors.Trace(err)
}
// Remove indices.
for _, v := range t.Indices() {
if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L {
if err = v.X.Drop(txn); err != nil {
ctx.FinishTxn(true)
return errors.Trace(err)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
// Remove indices.
for _, v := range t.Indices() {
if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L {
if err := v.X.Drop(txn); err != nil {
return errors.Trace(err)
}
}
}
}
return errors.Trace(ctx.FinishTxn(false))
return nil
})
return errors.Trace(err)
}

View File

@ -90,7 +90,8 @@ func (s *testIndexSuite) TestIndex(c *C) {
t := testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
for i := 0; i < 10; i++ {
num := 10
for i := 0; i < num; i++ {
_, err = t.AddRecord(ctx, []interface{}{i, i, i})
c.Assert(err, IsNil)
}
@ -112,10 +113,10 @@ func (s *testIndexSuite) TestIndex(c *C) {
index := t.FindIndexByColName("c1")
c.Assert(index, NotNil)
h, err := t.AddRecord(ctx, []interface{}{11, 11, 11})
h, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1})
c.Assert(err, IsNil)
h1, err := t.AddRecord(ctx, []interface{}{11, 11, 11})
h1, err := t.AddRecord(ctx, []interface{}{num + 1, 1, 1})
c.Assert(err, NotNil)
c.Assert(h, Equals, h1)
@ -176,6 +177,7 @@ func (s *testIndexSuite) TestIndexWait(c *C) {
ticker := time.NewTicker(d.lease)
done := make(chan *model.Job, 1)
ctx.FinishTxn(false)
go func() {
done <- s.testCreateIndex(c, ctx, d, tblInfo, true, "c1_uni", "c1")
}()
@ -227,6 +229,9 @@ LOOP:
c.Assert(exist, IsTrue)
}
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
d.start()
}
}
@ -234,6 +239,7 @@ LOOP:
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
c.Assert(t.FindIndexByColName("c1"), NotNil)
ctx.FinishTxn(false)
go func() {
done <- s.testDropIndex(c, ctx, d, tblInfo, "c1_uni")
}()
@ -284,6 +290,9 @@ LOOP1:
c.Assert(exist, IsTrue)
}
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
d.start()
}
}
@ -293,5 +302,5 @@ LOOP1:
}
func init() {
log.SetLevelByString("warn")
log.SetLevelByString("error")
}