diff --git a/ddl/ddl.go b/ddl/ddl.go index c484c7b23e..3c5d41b3c2 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -76,6 +76,10 @@ type ddl struct { // use a persistent job list. reOrgDoneCh chan error + // reOrgHandle is used for adding data reorgnization, after every batch, + // we will update for later job update. + reOrgHandle int64 + quitCh chan struct{} wait sync.WaitGroup } diff --git a/ddl/index.go b/ddl/index.go index 0670a7f3cc..cfdc2aee02 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -16,6 +16,7 @@ package ddl import ( "bytes" "strings" + "sync/atomic" "github.com/juju/errors" "github.com/ngaut/log" @@ -193,6 +194,9 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { indexInfo.State = model.StateReorgnization // initialize SnapshotVer to 0 for later reorgnization check. job.SnapshotVer = 0 + // initialize reorg handle to 0 + job.ReOrgHandle = 0 + atomic.StoreInt64(&d.reOrgHandle, 0) err = t.UpdateTable(schemaID, tblInfo) return errors.Trace(err) case model.StateReorgnization: @@ -215,9 +219,13 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error { } err = d.runReorgJob(func() error { - return d.addTableIndex(tbl, indexInfo, job.SnapshotVer) + return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReOrgHandle) }) + // addTableIndex updates ReOrgHandle after one batch. + // so we update the job ReOrgHandle here. + job.ReOrgHandle = atomic.LoadInt64(&d.reOrgHandle) + if errors2.ErrorEqual(err, errWaitReorgTimeout) { // if timeout, we should return, check for the owner and re-wait job done. return nil @@ -377,8 +385,7 @@ const maxBatchSize = 1024 // 3, For one row, if the row has been already deleted, skip to next row. // 4, If not deleted, check whether index has existed, if existed, skip to next row. // 5, If index doesn't exist, create the index and then continue to handle next row. -func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error { - seekHandle := int64(0) +func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64, seekHandle int64) error { for { handles, err := d.getSnapshotRows(t, version, seekHandle) if err != nil { @@ -388,12 +395,14 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u } seekHandle = handles[len(handles)-1] + 1 - // TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle. err = d.backfillTableIndex(t, indexInfo, handles) if err != nil { return errors.Trace(err) } + + // update reOrgHandle here after every successful batch. + atomic.StoreInt64(&d.reOrgHandle, seekHandle) } } diff --git a/ddl/reorg.go b/ddl/reorg.go index e17237b7ab..19c6e0f4b3 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -98,7 +98,6 @@ const waitReorgTimeout = 10 * time.Second var errWaitReorgTimeout = errors.New("wait for reorgnization timeout") func (d *ddl) runReorgJob(f func() error) error { - // TODO use persistent reorgnization job list. if d.reOrgDoneCh == nil { // start a reorgnization job d.reOrgDoneCh = make(chan error, 1) diff --git a/model/ddl.go b/model/ddl.go index 33ee20554f..9f1f37e8ab 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -80,6 +80,11 @@ type Job struct { // snapshot version for this job. SnapshotVer uint64 `json:"snapshot_ver"` LastUpdateTS int64 `json:"last_update_ts"` + // For reorgnization adding data job like add index, add column, + // we will traverse the huge snapshot and add the data in batches. + // After some successful batches, we will update the ReorgHandle for + // later resuming if currect server crashed. + ReOrgHandle int64 `json:"reorg_handle"` } // Encode encodes job with json format.