diff --git a/ddl/column.go b/ddl/column.go index 469c8fc6a1..2cc43f9184 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -322,6 +322,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI } func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error { + log.Infof("[ddl] backfill column handles %v", len(handles)) var ( defaultVal types.Datum err error @@ -339,7 +340,7 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha colMap[col.ID] = &col.FieldType } for _, handle := range handles { - log.Info("[ddl] backfill column...", handle) + log.Debug("[ddl] backfill column...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil { return errors.Trace(err) diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 0bc180bebc..ec5d79a535 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -78,8 +78,9 @@ func (s *testDBSuite) SetUpSuite(c *C) { } func (s *testDBSuite) TearDownSuite(c *C) { - s.db.Close() + localstore.MockRemoteStore = false + s.db.Close() s.s.Close() } diff --git a/ddl/index.go b/ddl/index.go index 7d88f4ad64..954aede792 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -310,7 +310,8 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error { } } -func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ([]types.Datum, error) { +func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ( + kv.Key, []types.Datum, error) { // fetch datas cols := t.Cols() colMap := make(map[int64]*types.FieldType) @@ -321,18 +322,18 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), handle) rowVal, err := txn.Get(rowKey) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } row, err := tablecodec.DecodeRow(rowVal, colMap) if err != nil { - return nil, errors.Trace(err) + return nil, nil, errors.Trace(err) } vals := make([]types.Datum, 0, len(indexInfo.Columns)) for _, v := range indexInfo.Columns { col := cols[v.Offset] vals = append(vals, row[col.ID]) } - return vals, nil + return rowKey, vals, nil } const maxBatchSize = 1024 @@ -365,14 +366,12 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ([]int64, error) { ver := kv.Version{Ver: version} - snap, err := d.store.GetSnapshot(ver) if err != nil { return nil, errors.Trace(err) } firstKey := t.RecordKey(seekHandle) - it, err := snap.Seek(firstKey) if err != nil { return nil, errors.Trace(err) @@ -380,7 +379,6 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ( defer it.Close() handles := make([]int64, 0, maxBatchSize) - for it.Valid() { if !it.Key().HasPrefix(t.RecordPrefix()) { break @@ -392,13 +390,12 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ( return nil, errors.Trace(err) } - rk := t.RecordKey(handle) - handles = append(handles, handle) if len(handles) == maxBatchSize { break } + rk := t.RecordKey(handle) err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk)) if terror.ErrorEqual(err, kv.ErrNotExist) { break @@ -412,16 +409,17 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ( func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error { kvX := tables.NewIndex(t.Meta(), indexInfo) + log.Infof("[ddl] backfill index %v rows ", len(handles)) for _, handle := range handles { - log.Debug("[ddl] building index...", handle) + log.Debug("[ddl] backfill index...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil { return errors.Trace(err) } - vals, err1 := fetchRowColVals(txn, t, handle, indexInfo) + rowKey, vals, err1 := fetchRowColVals(txn, t, handle, indexInfo) if terror.ErrorEqual(err1, kv.ErrNotExist) { // row doesn't exist, skip it. return nil @@ -437,7 +435,6 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand // index already exists, skip it. return nil } - rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), handle) err1 = txn.LockKeys(rowKey) if err1 != nil { return errors.Trace(err1) diff --git a/ddl/reorg.go b/ddl/reorg.go index da3d9ee85e..1f84cd9821 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -130,12 +130,15 @@ func (d *ddl) runReorgJob(f func() error) error { // wait reorganization job done or timeout select { case err := <-d.reorgDoneCh: + log.Info("[ddl] run reorg job done") d.reorgDoneCh = nil return errors.Trace(err) case <-d.quitCh: + log.Info("[ddl] run reorg job ddl quit") // we return errWaitReorgTimeout here too, so that outer loop will break. return errWaitReorgTimeout case <-time.After(waitTimeout): + log.Infof("[ddl] run reorg job wait timeout :%v", waitTimeout) // if timeout, we will return, check the owner and retry to wait job done again. return errWaitReorgTimeout } @@ -174,8 +177,8 @@ func (d *ddl) delKeysWithPrefix(prefix kv.Key, jobType JobType) error { if err != nil { return errors.Trace(err) } - defer iter.Close() + for i := 0; i < maxBatchSize; i++ { if iter.Valid() && iter.Key().HasPrefix(prefix) { keys = append(keys, iter.Key().Clone()) @@ -188,6 +191,7 @@ func (d *ddl) delKeysWithPrefix(prefix kv.Key, jobType JobType) error { } } + log.Infof("[ddl] delete %v keys with prefix %q", len(keys), prefix) for _, key := range keys { err := txn.Delete(key) // must skip ErrNotExist