*: rename job to ddlJob, rename task to bgJob

This commit is contained in:
xia
2016-01-20 12:17:11 +08:00
parent 071eff6716
commit 79aaefbbc0
15 changed files with 222 additions and 208 deletions

View File

@ -24,7 +24,8 @@ import (
"github.com/pingcap/tidb/terror"
)
func (d *ddl) handleTaskQueue() error {
// handleBgJobQueue handles background job queue.
func (d *ddl) handleBgJobQueue() error {
if d.isClosed() {
return nil
}
@ -32,7 +33,7 @@ func (d *ddl) handleTaskQueue() error {
task := &model.Job{}
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlTaskFlag)
owner, err := d.checkOwner(t, bgJobFlag)
if terror.ErrorEqual(err, ErrNotOwner) {
return nil
}
@ -40,8 +41,8 @@ func (d *ddl) handleTaskQueue() error {
return errors.Trace(err)
}
// get the first task and run
task, err = d.getFirstTask(t)
// get the first background job and run
task, err = d.getFirstBgJob(t)
if err != nil {
return errors.Trace(err)
}
@ -49,18 +50,18 @@ func (d *ddl) handleTaskQueue() error {
return nil
}
d.runTask(t, task)
d.runBgJob(t, task)
if task.IsFinished() {
err = d.finishTask(t, task)
err = d.finishBgJob(t, task)
} else {
err = d.updateTask(t, task)
err = d.updateBgJob(t, task)
}
if err != nil {
return errors.Trace(err)
}
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLTaskOwner(owner)
err = t.SetBgJobOwner(owner)
return errors.Trace(err)
})
@ -72,7 +73,8 @@ func (d *ddl) handleTaskQueue() error {
return nil
}
func (d *ddl) runTask(t *meta.Meta, task *model.Job) {
// runBgJob runs background job.
func (d *ddl) runBgJob(t *meta.Meta, task *model.Job) {
task.State = model.JobRunning
var err error
@ -83,12 +85,12 @@ func (d *ddl) runTask(t *meta.Meta, task *model.Job) {
err = d.delReorgTable(t, task)
default:
task.State = model.JobCancelled
err = errors.Errorf("invalid task %v", task)
err = errors.Errorf("invalid background job %v", task)
}
if err != nil {
if task.State != model.JobCancelled {
log.Errorf("run task err %v", errors.ErrorStack(err))
log.Errorf("run background job err %v", errors.ErrorStack(err))
}
task.Error = err.Error()
@ -96,7 +98,8 @@ func (d *ddl) runTask(t *meta.Meta, task *model.Job) {
}
}
func (d *ddl) prepareTask(job *model.Job) error {
// prepareBgJob prepares background job.
func (d *ddl) prepareBgJob(job *model.Job) error {
task := &model.Job{
ID: job.ID,
SchemaID: job.SchemaID,
@ -107,7 +110,7 @@ func (d *ddl) prepareTask(job *model.Job) error {
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err1 := t.EnQueueDDLTask(task)
err1 := t.EnQueueBgJob(task)
return errors.Trace(err1)
})
@ -115,38 +118,42 @@ func (d *ddl) prepareTask(job *model.Job) error {
return errors.Trace(err)
}
func (d *ddl) startTask(tp model.ActionType) {
// startBgJob starts background job.
func (d *ddl) startBgJob(tp model.ActionType) {
switch tp {
case model.ActionDropSchema, model.ActionDropTable:
asyncNotify(d.dropTaskCh)
asyncNotify(d.bgJobCh)
}
}
func (d *ddl) getFirstTask(t *meta.Meta) (*model.Job, error) {
task, err := t.GetDDLTask(0)
// getFirstBgJob gets the first background job.
func (d *ddl) getFirstBgJob(t *meta.Meta) (*model.Job, error) {
task, err := t.GetBgJob(0)
return task, errors.Trace(err)
}
func (d *ddl) updateTask(t *meta.Meta, task *model.Job) error {
err := t.UpdateDDLTask(0, task)
// updateBgJob updates background job.
func (d *ddl) updateBgJob(t *meta.Meta, task *model.Job) error {
err := t.UpdateBgJob(0, task)
return errors.Trace(err)
}
func (d *ddl) finishTask(t *meta.Meta, task *model.Job) error {
log.Warnf("[ddl] finish DDL task %v", task)
if _, err := t.DeQueueDDLTask(); err != nil {
// finishBgJob finishs background job.
func (d *ddl) finishBgJob(t *meta.Meta, task *model.Job) error {
log.Warnf("[ddl] finish background job %v", task)
if _, err := t.DeQueueBgJob(); err != nil {
return errors.Trace(err)
}
err := t.AddHistoryDDLTask(task)
err := t.AddHistoryBgJob(task)
return errors.Trace(err)
}
func (d *ddl) onExecute() {
func (d *ddl) onBackgroundWorker() {
defer d.wait.Done()
// ensure that have ddl job convert to ddl task
// ensure that have ddl job convert to background job.
checkTime := chooseLeaseTime(8*d.lease, 10*time.Second)
ticker := time.NewTicker(checkTime)
@ -155,15 +162,15 @@ func (d *ddl) onExecute() {
for {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL task status again", checkTime)
case <-d.dropTaskCh:
log.Debugf("[ddl] wait %s to check background job status again", checkTime)
case <-d.bgJobCh:
case <-d.quitCh:
return
}
err := d.handleTaskQueue()
err := d.handleBgJobQueue()
if err != nil {
log.Errorf("[ddl] handle task err %v", errors.ErrorStack(err))
log.Errorf("[ddl] handle background job err %v", errors.ErrorStack(err))
}
}
}

View File

@ -37,25 +37,25 @@ func (s *testDDLSuite) TestDropSchemaError(c *C) {
Name: model.CIStr{O: "test"},
}},
}
d.prepareTask(task)
d.startTask(task.Type)
d.prepareBgJob(task)
d.startBgJob(task.Type)
time.Sleep(lease)
testCheckTaskCancelled(c, d, task)
testCheckBgJobCancelled(c, d, task)
}
func testCheckTaskCancelled(c *C, d *ddl, task *model.Job) {
func testCheckBgJobCancelled(c *C, d *ddl, task *model.Job) {
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
historyTask, err := t.GetHistoryDDLTask(task.ID)
historyBgJob, err := t.GetHistoryBgJob(task.ID)
c.Assert(err, IsNil)
c.Assert(historyTask.State, Equals, model.JobCancelled)
c.Assert(historyBgJob.State, Equals, model.JobCancelled)
return nil
})
}
func (s *testDDLSuite) TestInvalidTaskType(c *C) {
func (s *testDDLSuite) TestInvalidBgJobType(c *C) {
store := testCreateStore(c, "test_invalid_task_type")
defer store.Close()
@ -68,9 +68,9 @@ func (s *testDDLSuite) TestInvalidTaskType(c *C) {
TableID: 1,
Type: model.ActionCreateTable,
}
d.prepareTask(task)
d.startTask(model.ActionDropTable)
d.prepareBgJob(task)
d.startBgJob(model.ActionDropTable)
time.Sleep(lease)
testCheckTaskCancelled(c, d, task)
testCheckBgJobCancelled(c, d, task)
}

View File

@ -79,7 +79,7 @@ func testCreateColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, t
Args: []interface{}{col, pos, 0},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
@ -92,7 +92,7 @@ func testDropColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tbl
Args: []interface{}{model.NewCIStr(colName)},
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
if isError {
c.Assert(err, NotNil)
return nil

View File

@ -71,12 +71,12 @@ type ddl struct {
hook Callback
store kv.Storage
// schema lease seconds.
lease time.Duration
uuid string
jobCh chan struct{}
jobDoneCh chan struct{}
// drop database/table task
dropTaskCh chan struct{}
lease time.Duration
uuid string
ddlJobCh chan struct{}
ddlJobDoneCh chan struct{}
// drop database/table job runs in the background to preform.
bgJobCh chan struct{}
// reorgDoneCh is for reorganization, if the reorganization job is done,
// we will use this channel to notify outer.
// TODO: now we use goroutine to simulate reorganization jobs, later we may
@ -98,14 +98,14 @@ func newDDL(store kv.Storage, infoHandle *infoschema.Handle, hook Callback, leas
}
d := &ddl{
infoHandle: infoHandle,
hook: hook,
store: store,
lease: lease,
uuid: uuid.NewV4().String(),
jobCh: make(chan struct{}, 1),
jobDoneCh: make(chan struct{}, 1),
dropTaskCh: make(chan struct{}, 1),
infoHandle: infoHandle,
hook: hook,
store: store,
lease: lease,
uuid: uuid.NewV4().String(),
ddlJobCh: make(chan struct{}, 1),
ddlJobDoneCh: make(chan struct{}, 1),
bgJobCh: make(chan struct{}, 1),
}
d.start()
@ -123,18 +123,24 @@ func (d *ddl) Stop() error {
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := t.GetDDLOwner()
owner, err := t.GetDDLJobOwner()
if err != nil {
return errors.Trace(err)
}
// job's owner is me, clean it so other servers can compete for it quickly.
if owner != nil && owner.OwnerID == d.uuid {
if err = t.SetDDLOwner(&model.Owner{}); err != nil {
return errors.Trace(err)
}
if owner == nil && owner.OwnerID != d.uuid {
return nil
}
owner, err = t.GetDDLTaskOwner()
// job's owner is me, clean it so other servers can compete for it quickly.
return t.SetDDLJobOwner(&model.Owner{})
})
if err != nil {
return errors.Trace(err)
}
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
owner, err := t.GetBgJobOwner()
if err != nil {
return errors.Trace(err)
}
@ -142,8 +148,9 @@ func (d *ddl) Stop() error {
return nil
}
// task's owner is me, clean it so other servers can compete for it quickly.
return t.SetDDLTaskOwner(&model.Owner{})
return t.SetBgJobOwner(&model.Owner{})
})
return errors.Trace(err)
}
@ -163,12 +170,12 @@ func (d *ddl) Start() error {
func (d *ddl) start() {
d.quitCh = make(chan struct{})
d.wait.Add(2)
go d.onExecute()
go d.onWorker()
go d.onBackgroundWorker()
go d.onDDLWorker()
// for every start, we will send a fake job to let worker
// check owner first and try to find whether a job exists and run.
asyncNotify(d.jobCh)
asyncNotify(d.dropTaskCh)
asyncNotify(d.ddlJobCh)
asyncNotify(d.bgJobCh)
}
func (d *ddl) close() {
@ -261,7 +268,7 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr, charsetInfo
Args: []interface{}{dbInfo},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -278,7 +285,7 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) {
Type: model.ActionDropSchema,
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -525,7 +532,7 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col
Args: []interface{}{tbInfo},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -618,7 +625,7 @@ func (d *ddl) AddColumn(ctx context.Context, ti table.Ident, spec *AlterSpecific
Args: []interface{}{&col.ColumnInfo, spec.Position, 0},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -649,7 +656,7 @@ func (d *ddl) DropColumn(ctx context.Context, ti table.Ident, colName model.CISt
Args: []interface{}{colName},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -673,7 +680,7 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) {
Type: model.ActionDropTable,
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -701,7 +708,7 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde
Args: []interface{}{unique, indexName, indexID, idxColNames},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}
@ -725,7 +732,7 @@ func (d *ddl) DropIndex(ctx context.Context, ti table.Ident, indexName model.CIS
Args: []interface{}{indexName},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
err = d.hook.OnChanged(err)
return errors.Trace(err)
}

View File

@ -25,7 +25,7 @@ import (
"github.com/pingcap/tidb/terror"
)
func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
func (d *ddl) startDDLJob(ctx context.Context, job *model.Job) error {
// for every DDL, we must commit current transaction.
if err := ctx.FinishTxn(false); err != nil {
return errors.Trace(err)
@ -49,7 +49,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
}
// notice worker that we push a new job and wait the job done.
asyncNotify(d.jobCh)
asyncNotify(d.ddlJobCh)
log.Warnf("[ddl] start DDL job %v", job)
@ -63,16 +63,16 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
defer ticker.Stop()
for {
select {
case <-d.jobDoneCh:
case <-d.ddlJobDoneCh:
case <-ticker.C:
}
historyJob, err = d.getHistoryJob(jobID)
historyJob, err = d.getHistoryDDLJob(jobID)
if err != nil {
log.Errorf("[ddl] get history job err %v, check again", err)
log.Errorf("[ddl] get history DDL job err %v, check again", err)
continue
} else if historyJob == nil {
log.Warnf("[ddl] job %d is not in history, maybe not run", jobID)
log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID)
continue
}
@ -85,7 +85,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
}
}
func (d *ddl) getHistoryJob(id int64) (*model.Job, error) {
func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
@ -111,9 +111,9 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) {
switch flag {
case ddlJobFlag:
owner, err = t.GetDDLOwner()
case ddlTaskFlag:
owner, err = t.GetDDLTaskOwner()
owner, err = t.GetDDLJobOwner()
case bgJobFlag:
owner, err = t.GetBgJobOwner()
default:
err = errors.Errorf("invalid ddl flag %v", flag)
}
@ -132,8 +132,8 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) {
// the owner will update its owner status every 2 * lease time, so here we use
// 4 * lease to check its timeout.
maxTimeout := int64(4 * d.lease)
if flag == ddlTaskFlag {
// task doesn't need to guarantee other servers update the schema.
if flag == bgJobFlag {
// backgroun job doesn't need to guarantee other servers update the schema.
maxTimeout = int64(2 * d.lease)
}
if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout {
@ -142,36 +142,36 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) {
// update status.
switch flag {
case ddlJobFlag:
err = t.SetDDLOwner(owner)
case ddlTaskFlag:
err = t.SetDDLTaskOwner(owner)
err = t.SetDDLJobOwner(owner)
case bgJobFlag:
err = t.SetBgJobOwner(owner)
}
if err != nil {
return nil, errors.Trace(err)
}
log.Debugf("[ddl] become %s owner %s", flag, owner.OwnerID)
log.Debugf("[ddl] become %s job owner %s", flag, owner.OwnerID)
}
if owner.OwnerID != d.uuid {
log.Debugf("[ddl] not %s owner, owner is %s", flag, owner.OwnerID)
log.Debugf("[ddl] not %s job owner, owner is %s", flag, owner.OwnerID)
return nil, errors.Trace(ErrNotOwner)
}
return owner, nil
}
func (d *ddl) getFirstJob(t *meta.Meta) (*model.Job, error) {
func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) {
job, err := t.GetDDLJob(0)
return job, errors.Trace(err)
}
// every time we enter another state except final state, we must call this function.
func (d *ddl) updateJob(t *meta.Meta, job *model.Job) error {
func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job) error {
err := t.UpdateDDLJob(0, job)
return errors.Trace(err)
}
func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error {
func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error {
log.Warnf("[ddl] finish DDL job %v", job)
// done, notice and run next job.
_, err := t.DeQueueDDLJob()
@ -180,7 +180,7 @@ func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error {
}
switch job.Type {
case model.ActionDropSchema, model.ActionDropTable:
if err = d.prepareTask(job); err != nil {
if err = d.prepareBgJob(job); err != nil {
return errors.Trace(err)
}
}
@ -196,11 +196,11 @@ var ErrNotOwner = errors.New("DDL: not owner")
var ErrWorkerClosed = errors.New("DDL: worker is closed")
const (
ddlJobFlag = "job"
ddlTaskFlag = "task"
ddlJobFlag = "ddl"
bgJobFlag = "background"
)
func (d *ddl) handleJobQueue() error {
func (d *ddl) handleDDLJobQueue() error {
for {
if d.isClosed() {
return nil
@ -221,7 +221,7 @@ func (d *ddl) handleJobQueue() error {
// become the owner
// get the first job and run
job, err = d.getFirstJob(t)
job, err = d.getFirstDDLJob(t)
if job == nil || err != nil {
return errors.Trace(err)
}
@ -246,12 +246,12 @@ func (d *ddl) handleJobQueue() error {
// if run job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
d.runJob(t, job)
d.runDDLJob(t, job)
if job.IsFinished() {
err = d.finishJob(t, job)
err = d.finishDDLJob(t, job)
} else {
err = d.updateJob(t, job)
err = d.updateDDLJob(t, job)
}
if err != nil {
return errors.Trace(err)
@ -260,7 +260,7 @@ func (d *ddl) handleJobQueue() error {
// running job may cost some time, so here we must update owner status to
// prevent other become the owner.
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLOwner(owner)
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
})
@ -281,8 +281,8 @@ func (d *ddl) handleJobQueue() error {
}
if job.IsFinished() {
d.startTask(job.Type)
asyncNotify(d.jobDoneCh)
d.startBgJob(job.Type)
asyncNotify(d.ddlJobDoneCh)
}
}
}
@ -295,9 +295,9 @@ func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
return n2
}
// onWorker is for async online schema change, it will try to become the owner first,
// onDDLWorker is for async online schema change, it will try to become the owner first,
// then wait or pull the job queue to handle a schema change job.
func (d *ddl) onWorker() {
func (d *ddl) onDDLWorker() {
defer d.wait.Done()
// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
@ -311,19 +311,19 @@ func (d *ddl) onWorker() {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
case <-d.jobCh:
case <-d.ddlJobCh:
case <-d.quitCh:
return
}
err := d.handleJobQueue()
err := d.handleDDLJobQueue()
if err != nil {
log.Errorf("[ddl] handle job err %v", errors.ErrorStack(err))
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
}
}
}
func (d *ddl) runJob(t *meta.Meta, job *model.Job) {
func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) {
if job.IsFinished() {
return
}
@ -351,14 +351,14 @@ func (d *ddl) runJob(t *meta.Meta, job *model.Job) {
default:
// invalid job, cancel it.
job.State = model.JobCancelled
err = errors.Errorf("invalid job %v", job)
err = errors.Errorf("invalid ddl job %v", job)
}
// saves error in job, so that others can know error happens.
if err != nil {
// if job is not cancelled, we should log this error.
if job.State != model.JobCancelled {
log.Errorf("run job err %v", errors.ErrorStack(err))
log.Errorf("run ddl job err %v", errors.ErrorStack(err))
}
job.Error = err.Error()

View File

@ -75,19 +75,19 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
time.Sleep(lease)
testCheckOwner(c, d1, true, ddlJobFlag)
testCheckOwner(c, d1, true, ddlTaskFlag)
testCheckOwner(c, d1, true, bgJobFlag)
d2 := newDDL(store, nil, nil, lease)
defer d2.close()
testCheckOwner(c, d2, false, ddlJobFlag)
testCheckOwner(c, d2, false, ddlTaskFlag)
testCheckOwner(c, d2, false, bgJobFlag)
d1.close()
time.Sleep(6 * lease)
testCheckOwner(c, d2, true, ddlJobFlag)
testCheckOwner(c, d2, true, ddlTaskFlag)
testCheckOwner(c, d2, true, bgJobFlag)
d2.SetLease(1 * time.Second)
@ -101,7 +101,7 @@ func (s *testDDLSuite) TestCheckOwner(c *C) {
c.Assert(err, IsNil)
testCheckOwner(c, d1, true, ddlJobFlag)
testCheckOwner(c, d1, true, ddlTaskFlag)
testCheckOwner(c, d1, true, bgJobFlag)
d2.SetLease(1 * time.Second)
d2.SetLease(2 * time.Second)
@ -125,7 +125,7 @@ func (s *testDDLSuite) TestSchemaError(c *C) {
ctx := testNewContext(c, d)
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
}
@ -148,7 +148,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
ctx := testNewContext(c, d)
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -157,7 +157,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
tblInfo := testTableInfo(c, d, "t", 3)
job.Args = []interface{}{tblInfo}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -167,7 +167,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
Type: model.ActionDropTable,
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -180,7 +180,7 @@ func (s *testDDLSuite) TestTableError(c *C) {
Type: model.ActionDropTable,
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -217,7 +217,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionAddIndex,
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -227,7 +227,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionDropIndex,
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -243,7 +243,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionAddIndex,
Args: []interface{}{1},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -253,7 +253,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionAddIndex,
Args: []interface{}{false, model.NewCIStr("t"), []*coldef.IndexColName{{ColumnName: "c", Length: 256}}},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -263,7 +263,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionAddIndex,
Args: []interface{}{false, model.NewCIStr("c1_index"), []*coldef.IndexColName{{ColumnName: "c", Length: 256}}},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -275,7 +275,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionAddIndex,
Args: []interface{}{false, model.NewCIStr("c1_index"), []*coldef.IndexColName{{ColumnName: "c1", Length: 256}}},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -285,7 +285,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionDropIndex,
Args: []interface{}{1},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -297,7 +297,7 @@ func (s *testDDLSuite) TestIndexError(c *C) {
Type: model.ActionDropIndex,
Args: []interface{}{model.NewCIStr("c1_index")},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
}
@ -319,7 +319,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
Type: model.ActionAddColumn,
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -329,7 +329,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
Type: model.ActionDropColumn,
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -358,7 +358,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
Args: []interface{}{col, pos, 0},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
@ -369,7 +369,7 @@ func (s *testDDLSuite) TestColumnError(c *C) {
Args: []interface{}{1},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
}

View File

@ -67,7 +67,7 @@ func testCreateIndex(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tb
Args: []interface{}{unique, model.NewCIStr(indexName), id, []*coldef.IndexColName{{ColumnName: colName, Length: 256}}},
}
err = d.startJob(ctx, job)
err = d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
@ -80,7 +80,7 @@ func testDropIndex(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblI
Args: []interface{}{model.NewCIStr(indexName)},
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}

View File

@ -141,7 +141,7 @@ func (d *ddl) isReorgRunnable(txn kv.Transaction) error {
}
t := meta.NewMeta(txn)
owner, err := t.GetDDLOwner()
owner, err := t.GetDDLJobOwner()
if err != nil {
return errors.Trace(err)
} else if owner == nil || owner.OwnerID != d.uuid {

View File

@ -48,7 +48,7 @@ func testCreateSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) *
Args: []interface{}{dbInfo},
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
@ -59,13 +59,13 @@ func testDropSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) *mo
Type: model.ActionDropSchema,
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
func checkDrop(c *C, t *meta.Meta) bool {
task, err := t.GetDDLTask(0)
task, err := t.GetBgJob(0)
c.Assert(err, IsNil)
if task == nil {
return true
@ -156,7 +156,7 @@ func (s *testSchemaSuite) TestSchema(c *C) {
Type: model.ActionDropSchema,
}
err := d1.startJob(ctx, job)
err := d1.startDDLJob(ctx, job)
c.Assert(terror.ErrorEqual(err, infoschema.DatabaseNotExists), IsTrue)
}
@ -195,7 +195,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) {
Args: []interface{}{dbInfo},
}
err = d2.startJob(ctx, job)
err = d2.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d2, job)
@ -207,7 +207,7 @@ func testRunInterruptedJob(c *C, d *ddl, job *model.Job) {
ctx := mock.NewContext()
done := make(chan error, 1)
go func() {
done <- d.startJob(ctx, job)
done <- d.startDDLJob(ctx, job)
}()
ticker := time.NewTicker(d.lease * 1)

View File

@ -59,7 +59,7 @@ func (s *testStatSuite) TestStat(c *C) {
ctx := mock.NewContext()
done := make(chan error, 1)
go func() {
done <- d.startJob(ctx, job)
done <- d.startDDLJob(ctx, job)
}()
ticker := time.NewTicker(d.lease * 1)

View File

@ -77,7 +77,7 @@ func testCreateTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tb
Args: []interface{}{tblInfo},
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
@ -89,7 +89,7 @@ func testDropTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblI
Type: model.ActionDropTable,
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, IsNil)
return job
}
@ -168,7 +168,7 @@ func (s *testTableSuite) TestTable(c *C) {
Args: []interface{}{newTblInfo},
}
err := d.startJob(ctx, job)
err := d.startDDLJob(ctx, job)
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)

View File

@ -41,7 +41,7 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) {
info := &DDLInfo{}
t := meta.NewMeta(txn)
info.Owner, err = t.GetDDLOwner()
info.Owner, err = t.GetDDLJobOwner()
if err != nil {
return nil, errors.Trace(err)
}

View File

@ -130,7 +130,7 @@ func (s *testSuite) TestGetDDLInfo(c *C) {
t := meta.NewMeta(txn)
owner := &model.Owner{OwnerID: "owner"}
err = t.SetDDLOwner(owner)
err = t.SetDDLJobOwner(owner)
c.Assert(err, IsNil)
dbInfo2 := &model.DBInfo{
ID: 2,

View File

@ -416,13 +416,13 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) {
// to operate DDL jobs, and dispatch them to MR Jobs.
var (
mDDLOwnerKey = []byte("DDLOwner")
mDDLJobOwnerKey = []byte("DDLJobOwner")
mDDLJobListKey = []byte("DDLJobList")
mDDLJobHistoryKey = []byte("DDLJobHistory")
mDDLJobReorgKey = []byte("DDLJobReorg")
)
func (m *Meta) getDDLOwner(key []byte) (*model.Owner, error) {
func (m *Meta) getJobOwner(key []byte) (*model.Owner, error) {
value, err := m.txn.Get(key)
if err != nil || value == nil {
return nil, errors.Trace(err)
@ -433,12 +433,12 @@ func (m *Meta) getDDLOwner(key []byte) (*model.Owner, error) {
return owner, errors.Trace(err)
}
// GetDDLOwner gets the current owner for DDL.
func (m *Meta) GetDDLOwner() (*model.Owner, error) {
return m.getDDLOwner(mDDLOwnerKey)
// GetDDLJobOwner gets the current owner for DDL.
func (m *Meta) GetDDLJobOwner() (*model.Owner, error) {
return m.getJobOwner(mDDLJobOwnerKey)
}
func (m *Meta) setDDLOwner(key []byte, o *model.Owner) error {
func (m *Meta) setJobOwner(key []byte, o *model.Owner) error {
b, err := json.Marshal(o)
if err != nil {
return errors.Trace(err)
@ -446,9 +446,9 @@ func (m *Meta) setDDLOwner(key []byte, o *model.Owner) error {
return m.txn.Set(key, b)
}
// SetDDLOwner sets the current owner for DDL.
func (m *Meta) SetDDLOwner(o *model.Owner) error {
return m.setDDLOwner(mDDLOwnerKey, o)
// SetDDLJobOwner sets the current owner for DDL.
func (m *Meta) SetDDLJobOwner(o *model.Owner) error {
return m.setJobOwner(mDDLJobOwnerKey, o)
}
func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error {
@ -587,64 +587,64 @@ func (m *Meta) GetDDLReorgHandle(job *model.Job) (int64, error) {
return value, errors.Trace(err)
}
// DDL task structure
// DDLTaskOnwer: []byte
// DDLTaskList: list tasks
// DDLTaskHistory: hash
// DDLTaskReorg: hash
// DDL background job structure
// BgJobOnwer: []byte
// BgJobList: list tasks
// BgJobHistory: hash
// BgJobReorg: hash
//
// for multi DDL executor, only one can become the owner
// to operate DDL tasks, and dispatch them to MR tasks.
// for multi background worker, only one can become the owner
// to operate background job, and dispatch them to MR background job.
var (
mDDLTaskOwnerKey = []byte("DDLTaskOwner")
mDDLTaskListKey = []byte("DDLTaskList")
mDDLTaskHistoryKey = []byte("DDLTaskHistory")
mBgJobOwnerKey = []byte("BgJobOwner")
mBgJobListKey = []byte("BgJobList")
mBgJobHistoryKey = []byte("BgJobHistory")
)
// UpdateDDLTask updates the DDL task with index.
func (m *Meta) UpdateDDLTask(index int64, task *model.Job) error {
return m.updateDDLJob(index, task, mDDLTaskListKey)
// UpdateBgJob updates the background job with index.
func (m *Meta) UpdateBgJob(index int64, task *model.Job) error {
return m.updateDDLJob(index, task, mBgJobListKey)
}
// GetDDLTask returns the DDL task with index.
func (m *Meta) GetDDLTask(index int64) (*model.Job, error) {
task, err := m.getDDLJob(mDDLTaskListKey, index)
// GetBgJob returns the background job with index.
func (m *Meta) GetBgJob(index int64) (*model.Job, error) {
task, err := m.getDDLJob(mBgJobListKey, index)
return task, errors.Trace(err)
}
// EnQueueDDLTask adds a DDL task to the list.
func (m *Meta) EnQueueDDLTask(task *model.Job) error {
return m.enQueueDDLJob(mDDLTaskListKey, task)
// EnQueueBgJob adds a background job to the list.
func (m *Meta) EnQueueBgJob(task *model.Job) error {
return m.enQueueDDLJob(mBgJobListKey, task)
}
// DDLTaskLength returns the DDL task length.
func (m *Meta) DDLTaskLength() (int64, error) {
return m.txn.LLen(mDDLTaskListKey)
// BgJobLength returns the background job length.
func (m *Meta) BgJobLength() (int64, error) {
return m.txn.LLen(mBgJobListKey)
}
// AddHistoryDDLTask adds DDL task to history.
func (m *Meta) AddHistoryDDLTask(task *model.Job) error {
return m.addHistoryDDLJob(mDDLTaskHistoryKey, task)
// AddHistoryBgJob adds background job to history.
func (m *Meta) AddHistoryBgJob(task *model.Job) error {
return m.addHistoryDDLJob(mBgJobHistoryKey, task)
}
// GetHistoryDDLTask gets a history DDL task.
func (m *Meta) GetHistoryDDLTask(id int64) (*model.Job, error) {
return m.getHistoryDDLJob(mDDLTaskHistoryKey, id)
// GetHistoryBgJob gets a history background job.
func (m *Meta) GetHistoryBgJob(id int64) (*model.Job, error) {
return m.getHistoryDDLJob(mBgJobHistoryKey, id)
}
// DeQueueDDLTask pops a DDL task from the list.
func (m *Meta) DeQueueDDLTask() (*model.Job, error) {
return m.deQueueDDLJob(mDDLTaskListKey)
// DeQueueBgJob pops a background job from the list.
func (m *Meta) DeQueueBgJob() (*model.Job, error) {
return m.deQueueDDLJob(mBgJobListKey)
}
// GetDDLTaskOwner gets the current task owner for DDL.
func (m *Meta) GetDDLTaskOwner() (*model.Owner, error) {
return m.getDDLOwner(mDDLTaskOwnerKey)
// GetBgJobOwner gets the current background job owner.
func (m *Meta) GetBgJobOwner() (*model.Owner, error) {
return m.getJobOwner(mBgJobOwnerKey)
}
// SetDDLTaskOwner sets the current task owner for DDL.
func (m *Meta) SetDDLTaskOwner(o *model.Owner) error {
return m.setDDLOwner(mDDLTaskOwnerKey, o)
// SetBgJobOwner sets the current background job owner.
func (m *Meta) SetBgJobOwner(o *model.Owner) error {
return m.setJobOwner(mBgJobOwnerKey, o)
}

View File

@ -175,9 +175,9 @@ func (s *testSuite) TestDDL(c *C) {
t := meta.NewMeta(txn)
owner := &model.Owner{OwnerID: "1"}
err = t.SetDDLOwner(owner)
err = t.SetDDLJobOwner(owner)
c.Assert(err, IsNil)
ov, err := t.GetDDLOwner()
ov, err := t.GetDDLJobOwner()
c.Assert(err, IsNil)
c.Assert(owner, DeepEquals, ov)
@ -219,36 +219,36 @@ func (s *testSuite) TestDDL(c *C) {
c.Assert(v, DeepEquals, job)
// DDL task test
err = t.SetDDLTaskOwner(owner)
err = t.SetBgJobOwner(owner)
c.Assert(err, IsNil)
ov, err = t.GetDDLTaskOwner()
ov, err = t.GetBgJobOwner()
c.Assert(err, IsNil)
c.Assert(owner, DeepEquals, ov)
task := &model.Job{ID: 1}
err = t.EnQueueDDLTask(task)
err = t.EnQueueBgJob(task)
c.Assert(err, IsNil)
n, err = t.DDLTaskLength()
n, err = t.BgJobLength()
c.Assert(err, IsNil)
c.Assert(n, Equals, int64(1))
v, err = t.GetDDLTask(0)
v, err = t.GetBgJob(0)
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, task)
v, err = t.GetDDLTask(1)
v, err = t.GetBgJob(1)
c.Assert(err, IsNil)
c.Assert(v, IsNil)
task.ID = 2
err = t.UpdateDDLTask(0, task)
err = t.UpdateBgJob(0, task)
c.Assert(err, IsNil)
v, err = t.DeQueueDDLTask()
v, err = t.DeQueueBgJob()
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, task)
err = t.AddHistoryDDLTask(task)
err = t.AddHistoryBgJob(task)
c.Assert(err, IsNil)
v, err = t.GetHistoryDDLTask(2)
v, err = t.GetHistoryBgJob(2)
c.Assert(err, IsNil)
c.Assert(v, DeepEquals, task)