ddl: simplify adding index and add more test.
This commit is contained in:
141
ddl/index.go
141
ddl/index.go
@ -165,20 +165,23 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
|
||||
// write only -> public
|
||||
job.SchemaState = model.StateReorgnization
|
||||
indexInfo.State = model.StateReorgnization
|
||||
|
||||
// get the current version for later Reorgnization.
|
||||
var ver kv.Version
|
||||
ver, err = d.store.CurrentVersion()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
job.SnapshotVer = ver.Ver
|
||||
|
||||
// initialize SnapshotVer to 0 for later reorgnization check.
|
||||
job.SnapshotVer = 0
|
||||
err = t.UpdateTable(schemaID, tblInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReorgnization:
|
||||
// reorganization -> public
|
||||
// get the current version for reorgnization if we don't have
|
||||
if job.SnapshotVer == 0 {
|
||||
var ver kv.Version
|
||||
ver, err = d.store.CurrentVersion()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
job.SnapshotVer = ver.Ver
|
||||
}
|
||||
|
||||
var tbl table.Table
|
||||
tbl, err = d.getTable(t, schemaID, tblInfo)
|
||||
if err != nil {
|
||||
@ -314,7 +317,7 @@ func checkRowExist(txn kv.Transaction, t table.Table, handle int64) (bool, error
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func fetchCurrentRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]interface{}, error) {
|
||||
func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]interface{}, error) {
|
||||
// fetch datas
|
||||
cols := t.Cols()
|
||||
var vals []interface{}
|
||||
@ -337,52 +340,12 @@ func fetchCurrentRowColVals(txn kv.Transaction, t table.Table, handle int64, ind
|
||||
return vals, nil
|
||||
}
|
||||
|
||||
func fetchSnapRowColVals(snap kv.MvccSnapshot, ver kv.Version, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]interface{}, error) {
|
||||
// fetch datas
|
||||
cols := t.Cols()
|
||||
var vals []interface{}
|
||||
for _, v := range indexInfo.Columns {
|
||||
var val interface{}
|
||||
|
||||
col := cols[v.Offset]
|
||||
k := t.RecordKey(handle, col)
|
||||
data, err := snap.MvccGet(kv.EncodeKey([]byte(k)), ver)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
val, err = t.DecodeValue(data, col)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
vals = append(vals, val)
|
||||
}
|
||||
|
||||
return vals, nil
|
||||
}
|
||||
|
||||
func (d *ddl) needAddingIndexForRow(txn kv.Transaction, t table.Table, handle int64, kvIndex kv.Index, indexInfo *model.IndexInfo) (bool, error) {
|
||||
if ok, err := checkRowExist(txn, t, handle); err != nil {
|
||||
return false, errors.Trace(err)
|
||||
} else if !ok {
|
||||
// if row doesn't exist, we don't need to add index
|
||||
return false, nil
|
||||
}
|
||||
|
||||
vals, err := fetchCurrentRowColVals(txn, t, handle, indexInfo)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
|
||||
if ok, _, err := kvIndex.Exist(txn, vals, handle); err != nil {
|
||||
return false, errors.Trace(err)
|
||||
} else if ok {
|
||||
// index exists, we don't need to add again
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// How to add index in reorgnization state?
|
||||
// 1, Generate a snapshot with special version.
|
||||
// 2, Traverse the snapshot, get every row in the table.
|
||||
// 3, For one row, if the row has been already deleted, skip to next row.
|
||||
// 4, If not deleted, check whether index has existed, if existed, skip to next row.
|
||||
// 5, If index doesn't exist, create the index and then continue to handle next row.
|
||||
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error {
|
||||
ver := kv.Version{Ver: version}
|
||||
|
||||
@ -396,13 +359,6 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
|
||||
firstKey := t.FirstKey()
|
||||
prefix := []byte(t.KeyPrefix())
|
||||
|
||||
ctx := d.newReorgContext()
|
||||
txn, err := ctx.GetTxn(true)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer txn.Rollback()
|
||||
|
||||
it := snap.NewMvccIterator(kv.EncodeKey([]byte(firstKey)), ver)
|
||||
defer it.Close()
|
||||
|
||||
@ -422,43 +378,46 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
|
||||
|
||||
log.Info("building index...", handle)
|
||||
|
||||
// first check need adding index or not.
|
||||
var need bool
|
||||
need, err = d.needAddingIndexForRow(txn, t, handle, kvX, indexInfo)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// the first key in one row is the lock.
|
||||
lock := t.RecordKey(handle, nil)
|
||||
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
// first check row exists
|
||||
var exist bool
|
||||
exist, err = checkRowExist(txn, t, handle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if !exist {
|
||||
// row doesn't exist, skip it.
|
||||
return nil
|
||||
}
|
||||
|
||||
if need {
|
||||
var vals []interface{}
|
||||
vals, err = fetchSnapRowColVals(snap, ver, t, handle, indexInfo)
|
||||
vals, err = fetchRowColVals(txn, t, handle, indexInfo)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
var indexExist bool
|
||||
indexExist, _, err = kvX.Exist(txn, vals, handle)
|
||||
exist, _, err = kvX.Exist(txn, vals, handle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if exist {
|
||||
// index already exists, skip it.
|
||||
return nil
|
||||
}
|
||||
|
||||
// mean we haven't already added this index.
|
||||
// lock row first
|
||||
err = txn.LockKeys(lock)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if !indexExist {
|
||||
// mean we haven't already added this index.
|
||||
// should lock row here???
|
||||
err = t.LockRow(ctx, handle, true)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// create the index.
|
||||
err = kvX.Create(txn, vals, handle)
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
// create the index.
|
||||
err = kvX.Create(txn, vals, handle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rk := kv.EncodeKey([]byte(t.RecordKey(handle, nil)))
|
||||
rk := kv.EncodeKey(lock)
|
||||
it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
|
||||
if errors2.ErrorEqual(err, kv.ErrNotExist) {
|
||||
break
|
||||
@ -467,7 +426,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Trace(txn.Commit())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error {
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
package ddl
|
||||
|
||||
import (
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ngaut/log"
|
||||
@ -24,6 +24,7 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/model"
|
||||
"github.com/pingcap/tidb/parser/coldef"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/util/mock"
|
||||
)
|
||||
|
||||
@ -51,7 +52,7 @@ func (s *testIndexSuite) TearDownSuite(c *C) {
|
||||
s.store.Close()
|
||||
}
|
||||
|
||||
func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
|
||||
func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
|
||||
job := &model.Job{
|
||||
SchemaID: s.dbInfo.ID,
|
||||
TableID: tblInfo.ID,
|
||||
@ -59,12 +60,12 @@ func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, tblInfo *mod
|
||||
Args: []interface{}{unique, model.NewCIStr(indexName), []*coldef.IndexColName{{ColumnName: colName, Length: 256}}},
|
||||
}
|
||||
|
||||
err := s.d.startJob(ctx, job)
|
||||
err := d.startJob(ctx, job)
|
||||
c.Assert(err, IsNil)
|
||||
return job
|
||||
}
|
||||
|
||||
func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, tblInfo *model.TableInfo, indexName string) *model.Job {
|
||||
func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, indexName string) *model.Job {
|
||||
job := &model.Job{
|
||||
SchemaID: s.dbInfo.ID,
|
||||
TableID: tblInfo.ID,
|
||||
@ -72,7 +73,7 @@ func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, tblInfo *model
|
||||
Args: []interface{}{model.NewCIStr(indexName)},
|
||||
}
|
||||
|
||||
err := s.d.startJob(ctx, job)
|
||||
err := d.startJob(ctx, job)
|
||||
c.Assert(err, IsNil)
|
||||
return job
|
||||
}
|
||||
@ -104,7 +105,7 @@ func (s *testIndexSuite) TestIndex(c *C) {
|
||||
return true, nil
|
||||
})
|
||||
|
||||
job := s.testCreateIndex(c, ctx, tblInfo, true, "c1_uni", "c1")
|
||||
job := s.testCreateIndex(c, ctx, s.d, tblInfo, true, "c1_uni", "c1")
|
||||
testCheckJobDone(c, s.d, job, true)
|
||||
|
||||
t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
|
||||
@ -118,31 +119,178 @@ func (s *testIndexSuite) TestIndex(c *C) {
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(h, Equals, h1)
|
||||
|
||||
_, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
|
||||
h, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
it, _, err := index.X.Seek(txn, []interface{}{1})
|
||||
txn, err = ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(it, NotNil)
|
||||
|
||||
_, h2, err := it.Next()
|
||||
exist, _, err := index.X.Exist(txn, []interface{}{1}, h)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(h, Equals, h2)
|
||||
c.Assert(exist, IsTrue)
|
||||
|
||||
it.Close()
|
||||
|
||||
s.testDropIndex(c, ctx, tblInfo, "c1_uni")
|
||||
s.testDropIndex(c, ctx, s.d, tblInfo, "c1_uni")
|
||||
|
||||
t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
|
||||
index1 := t.FindIndexByColName("c1")
|
||||
c.Assert(index1, IsNil)
|
||||
|
||||
it, _, _ = index.X.Seek(txn, []interface{}{1})
|
||||
c.Assert(it, NotNil)
|
||||
_, _, err = it.Next()
|
||||
c.Assert(err.Error(), Equals, io.EOF.Error())
|
||||
txn, err = ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
exist, _, err = index.X.Exist(txn, []interface{}{1}, h)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(exist, IsFalse)
|
||||
}
|
||||
|
||||
func testGetIndex(t table.Table, name string) *column.IndexedCol {
|
||||
for _, idx := range t.Indices() {
|
||||
// only public index can be read.
|
||||
|
||||
if len(idx.Columns) == 1 && strings.EqualFold(idx.Columns[0].Name.L, name) {
|
||||
return idx
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *testIndexSuite) TestIndexWait(c *C) {
|
||||
d := newDDL(s.store, nil, nil, 100*time.Millisecond)
|
||||
defer d.close()
|
||||
|
||||
tblInfo := testTableInfo(c, d, "t")
|
||||
ctx := testNewContext(c, d)
|
||||
defer ctx.FinishTxn(true)
|
||||
|
||||
txn, err := ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
|
||||
|
||||
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
|
||||
|
||||
var h int64
|
||||
h, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
ticker := time.NewTicker(d.lease)
|
||||
done := make(chan *model.Job, 1)
|
||||
|
||||
go func() {
|
||||
done <- s.testCreateIndex(c, ctx, d, tblInfo, true, "c1_uni", "c1")
|
||||
}()
|
||||
|
||||
LOOP:
|
||||
for {
|
||||
select {
|
||||
case job := <-done:
|
||||
testCheckJobDone(c, d, job, true)
|
||||
break LOOP
|
||||
case <-ticker.C:
|
||||
d.close()
|
||||
|
||||
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
|
||||
index := testGetIndex(t, "c1")
|
||||
if index == nil {
|
||||
d.start()
|
||||
continue
|
||||
}
|
||||
|
||||
err = t.RemoveRowAllIndex(ctx, h, []interface{}{1})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = t.RemoveRow(ctx, h)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err = ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
exist, _, err := index.X.Exist(txn, []interface{}{1}, h)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(exist, IsFalse)
|
||||
|
||||
h, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err = ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
exist, _, err = index.X.Exist(txn, []interface{}{1}, h)
|
||||
c.Assert(err, IsNil)
|
||||
switch index.State {
|
||||
case model.StateDeleteOnly:
|
||||
c.Assert(exist, IsFalse)
|
||||
case model.StateNone:
|
||||
c.Fatalf("can be none state")
|
||||
default:
|
||||
c.Assert(exist, IsTrue)
|
||||
}
|
||||
|
||||
d.start()
|
||||
}
|
||||
}
|
||||
|
||||
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
|
||||
c.Assert(t.FindIndexByColName("c1"), NotNil)
|
||||
|
||||
go func() {
|
||||
done <- s.testDropIndex(c, ctx, d, tblInfo, "c1_uni")
|
||||
}()
|
||||
|
||||
LOOP1:
|
||||
for {
|
||||
select {
|
||||
case job := <-done:
|
||||
testCheckJobDone(c, d, job, false)
|
||||
break LOOP1
|
||||
case <-ticker.C:
|
||||
d.close()
|
||||
|
||||
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
|
||||
index := testGetIndex(t, "c1")
|
||||
if index == nil {
|
||||
d.start()
|
||||
continue
|
||||
}
|
||||
|
||||
err = t.RemoveRowAllIndex(ctx, h, []interface{}{1})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
err = t.RemoveRow(ctx, h)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err = ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
exist, _, err := index.X.Exist(txn, []interface{}{1}, h)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(exist, IsFalse)
|
||||
|
||||
h, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
txn, err = ctx.GetTxn(true)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
exist, _, err = index.X.Exist(txn, []interface{}{1}, h)
|
||||
c.Assert(err, IsNil)
|
||||
switch index.State {
|
||||
case model.StateDeleteOnly:
|
||||
c.Assert(exist, IsFalse)
|
||||
case model.StateNone:
|
||||
c.Fatalf("can be none state")
|
||||
default:
|
||||
c.Assert(exist, IsTrue)
|
||||
}
|
||||
|
||||
d.start()
|
||||
}
|
||||
}
|
||||
|
||||
t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
|
||||
c.Assert(t.FindIndexByColName("c1"), IsNil)
|
||||
}
|
||||
|
||||
func init() {
|
||||
log.SetLevelByString("info")
|
||||
log.SetLevelByString("warn")
|
||||
}
|
||||
|
||||
@ -32,7 +32,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
|
||||
}
|
||||
|
||||
// Create a new job and queue it.
|
||||
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
t := meta.NewMeta(txn)
|
||||
var err error
|
||||
job.ID, err = t.GenGlobalID()
|
||||
|
||||
Reference in New Issue
Block a user