*: add ReOrgHandle to support resume reorg job.
This commit is contained in:
@ -76,6 +76,10 @@ type ddl struct {
|
||||
// use a persistent job list.
|
||||
reOrgDoneCh chan error
|
||||
|
||||
// reOrgHandle is used for adding data reorgnization, after every batch,
|
||||
// we will update for later job update.
|
||||
reOrgHandle int64
|
||||
|
||||
quitCh chan struct{}
|
||||
wait sync.WaitGroup
|
||||
}
|
||||
|
||||
17
ddl/index.go
17
ddl/index.go
@ -16,6 +16,7 @@ package ddl
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
@ -193,6 +194,9 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
|
||||
indexInfo.State = model.StateReorgnization
|
||||
// initialize SnapshotVer to 0 for later reorgnization check.
|
||||
job.SnapshotVer = 0
|
||||
// initialize reorg handle to 0
|
||||
job.ReOrgHandle = 0
|
||||
atomic.StoreInt64(&d.reOrgHandle, 0)
|
||||
err = t.UpdateTable(schemaID, tblInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReorgnization:
|
||||
@ -215,9 +219,13 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
|
||||
}
|
||||
|
||||
err = d.runReorgJob(func() error {
|
||||
return d.addTableIndex(tbl, indexInfo, job.SnapshotVer)
|
||||
return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReOrgHandle)
|
||||
})
|
||||
|
||||
// addTableIndex updates ReOrgHandle after one batch.
|
||||
// so we update the job ReOrgHandle here.
|
||||
job.ReOrgHandle = atomic.LoadInt64(&d.reOrgHandle)
|
||||
|
||||
if errors2.ErrorEqual(err, errWaitReorgTimeout) {
|
||||
// if timeout, we should return, check for the owner and re-wait job done.
|
||||
return nil
|
||||
@ -377,8 +385,7 @@ const maxBatchSize = 1024
|
||||
// 3, For one row, if the row has been already deleted, skip to next row.
|
||||
// 4, If not deleted, check whether index has existed, if existed, skip to next row.
|
||||
// 5, If index doesn't exist, create the index and then continue to handle next row.
|
||||
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error {
|
||||
seekHandle := int64(0)
|
||||
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64, seekHandle int64) error {
|
||||
for {
|
||||
handles, err := d.getSnapshotRows(t, version, seekHandle)
|
||||
if err != nil {
|
||||
@ -388,12 +395,14 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
|
||||
}
|
||||
|
||||
seekHandle = handles[len(handles)-1] + 1
|
||||
// TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle.
|
||||
|
||||
err = d.backfillTableIndex(t, indexInfo, handles)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// update reOrgHandle here after every successful batch.
|
||||
atomic.StoreInt64(&d.reOrgHandle, seekHandle)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -98,7 +98,6 @@ const waitReorgTimeout = 10 * time.Second
|
||||
var errWaitReorgTimeout = errors.New("wait for reorgnization timeout")
|
||||
|
||||
func (d *ddl) runReorgJob(f func() error) error {
|
||||
// TODO use persistent reorgnization job list.
|
||||
if d.reOrgDoneCh == nil {
|
||||
// start a reorgnization job
|
||||
d.reOrgDoneCh = make(chan error, 1)
|
||||
|
||||
@ -80,6 +80,11 @@ type Job struct {
|
||||
// snapshot version for this job.
|
||||
SnapshotVer uint64 `json:"snapshot_ver"`
|
||||
LastUpdateTS int64 `json:"last_update_ts"`
|
||||
// For reorgnization adding data job like add index, add column,
|
||||
// we will traverse the huge snapshot and add the data in batches.
|
||||
// After some successful batches, we will update the ReorgHandle for
|
||||
// later resuming if currect server crashed.
|
||||
ReOrgHandle int64 `json:"reorg_handle"`
|
||||
}
|
||||
|
||||
// Encode encodes job with json format.
|
||||
|
||||
Reference in New Issue
Block a user