diff --git a/ddl/index.go b/ddl/index.go index 74358145ea..1a1dba0dfc 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -165,20 +165,23 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { // write only -> public job.SchemaState = model.StateReorgnization indexInfo.State = model.StateReorgnization - - // get the current version for later Reorgnization. - var ver kv.Version - ver, err = d.store.CurrentVersion() - if err != nil { - return errors.Trace(err) - } - - job.SnapshotVer = ver.Ver - + // initialize SnapshotVer to 0 for later reorgnization check. + job.SnapshotVer = 0 err = t.UpdateTable(schemaID, tblInfo) return errors.Trace(err) case model.StateReorgnization: // reorganization -> public + // get the current version for reorgnization if we don't have + if job.SnapshotVer == 0 { + var ver kv.Version + ver, err = d.store.CurrentVersion() + if err != nil { + return errors.Trace(err) + } + + job.SnapshotVer = ver.Ver + } + var tbl table.Table tbl, err = d.getTable(t, schemaID, tblInfo) if err != nil { @@ -314,7 +317,7 @@ func checkRowExist(txn kv.Transaction, t table.Table, handle int64) (bool, error return true, nil } -func fetchCurrentRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]interface{}, error) { +func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]interface{}, error) { // fetch datas cols := t.Cols() var vals []interface{} @@ -337,52 +340,12 @@ func fetchCurrentRowColVals(txn kv.Transaction, t table.Table, handle int64, ind return vals, nil } -func fetchSnapRowColVals(snap kv.MvccSnapshot, ver kv.Version, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]interface{}, error) { - // fetch datas - cols := t.Cols() - var vals []interface{} - for _, v := range indexInfo.Columns { - var val interface{} - - col := cols[v.Offset] - k := t.RecordKey(handle, col) - data, err := snap.MvccGet(kv.EncodeKey([]byte(k)), ver) - if err != nil { - return nil, errors.Trace(err) - } - val, err = t.DecodeValue(data, col) - if err != nil { - return nil, errors.Trace(err) - } - vals = append(vals, val) - } - - return vals, nil -} - -func (d *ddl) needAddingIndexForRow(txn kv.Transaction, t table.Table, handle int64, kvIndex kv.Index, indexInfo *model.IndexInfo) (bool, error) { - if ok, err := checkRowExist(txn, t, handle); err != nil { - return false, errors.Trace(err) - } else if !ok { - // if row doesn't exist, we don't need to add index - return false, nil - } - - vals, err := fetchCurrentRowColVals(txn, t, handle, indexInfo) - if err != nil { - return false, errors.Trace(err) - } - - if ok, _, err := kvIndex.Exist(txn, vals, handle); err != nil { - return false, errors.Trace(err) - } else if ok { - // index exists, we don't need to add again - return false, nil - } - - return true, nil -} - +// How to add index in reorgnization state? +// 1, Generate a snapshot with special version. +// 2, Traverse the snapshot, get every row in the table. +// 3, For one row, if the row has been already deleted, skip to next row. +// 4, If not deleted, check whether index has existed, if existed, skip to next row. +// 5, If index doesn't exist, create the index and then continue to handle next row. func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error { ver := kv.Version{Ver: version} @@ -396,13 +359,6 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u firstKey := t.FirstKey() prefix := []byte(t.KeyPrefix()) - ctx := d.newReorgContext() - txn, err := ctx.GetTxn(true) - if err != nil { - return errors.Trace(err) - } - defer txn.Rollback() - it := snap.NewMvccIterator(kv.EncodeKey([]byte(firstKey)), ver) defer it.Close() @@ -422,43 +378,46 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u log.Info("building index...", handle) - // first check need adding index or not. - var need bool - need, err = d.needAddingIndexForRow(txn, t, handle, kvX, indexInfo) - if err != nil { - return errors.Trace(err) - } + // the first key in one row is the lock. + lock := t.RecordKey(handle, nil) + err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + // first check row exists + var exist bool + exist, err = checkRowExist(txn, t, handle) + if err != nil { + return errors.Trace(err) + } else if !exist { + // row doesn't exist, skip it. + return nil + } - if need { var vals []interface{} - vals, err = fetchSnapRowColVals(snap, ver, t, handle, indexInfo) + vals, err = fetchRowColVals(txn, t, handle, indexInfo) if err != nil { return errors.Trace(err) } - var indexExist bool - indexExist, _, err = kvX.Exist(txn, vals, handle) + exist, _, err = kvX.Exist(txn, vals, handle) + if err != nil { + return errors.Trace(err) + } else if exist { + // index already exists, skip it. + return nil + } + + // mean we haven't already added this index. + // lock row first + err = txn.LockKeys(lock) if err != nil { return errors.Trace(err) } - if !indexExist { - // mean we haven't already added this index. - // should lock row here??? - err = t.LockRow(ctx, handle, true) - if err != nil { - return errors.Trace(err) - } + // create the index. + err = kvX.Create(txn, vals, handle) + return errors.Trace(err) + }) - // create the index. - err = kvX.Create(txn, vals, handle) - if err != nil { - return errors.Trace(err) - } - } - } - - rk := kv.EncodeKey([]byte(t.RecordKey(handle, nil))) + rk := kv.EncodeKey(lock) it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk)) if errors2.ErrorEqual(err, kv.ErrNotExist) { break @@ -467,7 +426,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u } } - return errors.Trace(txn.Commit()) + return nil } func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error { diff --git a/ddl/index_test.go b/ddl/index_test.go index 83b8b9513a..1ede5d823e 100644 --- a/ddl/index_test.go +++ b/ddl/index_test.go @@ -14,7 +14,7 @@ package ddl import ( - "io" + "strings" "time" "github.com/ngaut/log" @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/model" "github.com/pingcap/tidb/parser/coldef" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/mock" ) @@ -51,7 +52,7 @@ func (s *testIndexSuite) TearDownSuite(c *C) { s.store.Close() } -func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { +func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job { job := &model.Job{ SchemaID: s.dbInfo.ID, TableID: tblInfo.ID, @@ -59,12 +60,12 @@ func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, tblInfo *mod Args: []interface{}{unique, model.NewCIStr(indexName), []*coldef.IndexColName{{ColumnName: colName, Length: 256}}}, } - err := s.d.startJob(ctx, job) + err := d.startJob(ctx, job) c.Assert(err, IsNil) return job } -func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, tblInfo *model.TableInfo, indexName string) *model.Job { +func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, indexName string) *model.Job { job := &model.Job{ SchemaID: s.dbInfo.ID, TableID: tblInfo.ID, @@ -72,7 +73,7 @@ func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, tblInfo *model Args: []interface{}{model.NewCIStr(indexName)}, } - err := s.d.startJob(ctx, job) + err := d.startJob(ctx, job) c.Assert(err, IsNil) return job } @@ -104,7 +105,7 @@ func (s *testIndexSuite) TestIndex(c *C) { return true, nil }) - job := s.testCreateIndex(c, ctx, tblInfo, true, "c1_uni", "c1") + job := s.testCreateIndex(c, ctx, s.d, tblInfo, true, "c1_uni", "c1") testCheckJobDone(c, s.d, job, true) t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID) @@ -118,31 +119,178 @@ func (s *testIndexSuite) TestIndex(c *C) { c.Assert(err, NotNil) c.Assert(h, Equals, h1) - _, err = t.AddRecord(ctx, []interface{}{1, 1, 1}) + h, err = t.AddRecord(ctx, []interface{}{1, 1, 1}) c.Assert(err, NotNil) - it, _, err := index.X.Seek(txn, []interface{}{1}) + txn, err = ctx.GetTxn(true) c.Assert(err, IsNil) - c.Assert(it, NotNil) - _, h2, err := it.Next() + exist, _, err := index.X.Exist(txn, []interface{}{1}, h) c.Assert(err, IsNil) - c.Assert(h, Equals, h2) + c.Assert(exist, IsTrue) - it.Close() - - s.testDropIndex(c, ctx, tblInfo, "c1_uni") + s.testDropIndex(c, ctx, s.d, tblInfo, "c1_uni") t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID) index1 := t.FindIndexByColName("c1") c.Assert(index1, IsNil) - it, _, _ = index.X.Seek(txn, []interface{}{1}) - c.Assert(it, NotNil) - _, _, err = it.Next() - c.Assert(err.Error(), Equals, io.EOF.Error()) + txn, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + exist, _, err = index.X.Exist(txn, []interface{}{1}, h) + c.Assert(err, IsNil) + c.Assert(exist, IsFalse) +} + +func testGetIndex(t table.Table, name string) *column.IndexedCol { + for _, idx := range t.Indices() { + // only public index can be read. + + if len(idx.Columns) == 1 && strings.EqualFold(idx.Columns[0].Name.L, name) { + return idx + } + } + return nil +} + +func (s *testIndexSuite) TestIndexWait(c *C) { + d := newDDL(s.store, nil, nil, 100*time.Millisecond) + defer d.close() + + tblInfo := testTableInfo(c, d, "t") + ctx := testNewContext(c, d) + defer ctx.FinishTxn(true) + + txn, err := ctx.GetTxn(true) + c.Assert(err, IsNil) + + testCreateTable(c, ctx, d, s.dbInfo, tblInfo) + + t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + + var h int64 + h, err = t.AddRecord(ctx, []interface{}{1, 1, 1}) + c.Assert(err, IsNil) + + ticker := time.NewTicker(d.lease) + done := make(chan *model.Job, 1) + + go func() { + done <- s.testCreateIndex(c, ctx, d, tblInfo, true, "c1_uni", "c1") + }() + +LOOP: + for { + select { + case job := <-done: + testCheckJobDone(c, d, job, true) + break LOOP + case <-ticker.C: + d.close() + + t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + index := testGetIndex(t, "c1") + if index == nil { + d.start() + continue + } + + err = t.RemoveRowAllIndex(ctx, h, []interface{}{1}) + c.Assert(err, IsNil) + + err = t.RemoveRow(ctx, h) + c.Assert(err, IsNil) + + txn, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + exist, _, err := index.X.Exist(txn, []interface{}{1}, h) + c.Assert(err, IsNil) + c.Assert(exist, IsFalse) + + h, err = t.AddRecord(ctx, []interface{}{1, 1, 1}) + c.Assert(err, IsNil) + + txn, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + exist, _, err = index.X.Exist(txn, []interface{}{1}, h) + c.Assert(err, IsNil) + switch index.State { + case model.StateDeleteOnly: + c.Assert(exist, IsFalse) + case model.StateNone: + c.Fatalf("can be none state") + default: + c.Assert(exist, IsTrue) + } + + d.start() + } + } + + t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + c.Assert(t.FindIndexByColName("c1"), NotNil) + + go func() { + done <- s.testDropIndex(c, ctx, d, tblInfo, "c1_uni") + }() + +LOOP1: + for { + select { + case job := <-done: + testCheckJobDone(c, d, job, false) + break LOOP1 + case <-ticker.C: + d.close() + + t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + index := testGetIndex(t, "c1") + if index == nil { + d.start() + continue + } + + err = t.RemoveRowAllIndex(ctx, h, []interface{}{1}) + c.Assert(err, IsNil) + + err = t.RemoveRow(ctx, h) + c.Assert(err, IsNil) + + txn, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + exist, _, err := index.X.Exist(txn, []interface{}{1}, h) + c.Assert(err, IsNil) + c.Assert(exist, IsFalse) + + h, err = t.AddRecord(ctx, []interface{}{1, 1, 1}) + c.Assert(err, IsNil) + + txn, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + exist, _, err = index.X.Exist(txn, []interface{}{1}, h) + c.Assert(err, IsNil) + switch index.State { + case model.StateDeleteOnly: + c.Assert(exist, IsFalse) + case model.StateNone: + c.Fatalf("can be none state") + default: + c.Assert(exist, IsTrue) + } + + d.start() + } + } + + t = testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + c.Assert(t.FindIndexByColName("c1"), IsNil) } func init() { - log.SetLevelByString("info") + log.SetLevelByString("warn") } diff --git a/ddl/worker.go b/ddl/worker.go index c5d5e760b5..601ff47cf9 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -32,7 +32,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { } // Create a new job and queue it. - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { t := meta.NewMeta(txn) var err error job.ID, err = t.GenGlobalID()