diff --git a/ddl/executor.go b/ddl/bg_worker.go similarity index 60% rename from ddl/executor.go rename to ddl/bg_worker.go index 0fd074752d..ac9319daa6 100644 --- a/ddl/executor.go +++ b/ddl/bg_worker.go @@ -24,7 +24,8 @@ import ( "github.com/pingcap/tidb/terror" ) -func (d *ddl) handleTaskQueue() error { +// handleBgJobQueue handles background job queue. +func (d *ddl) handleBgJobQueue() error { if d.isClosed() { return nil } @@ -32,7 +33,7 @@ func (d *ddl) handleTaskQueue() error { task := &model.Job{} err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) - owner, err := d.checkOwner(t, ddlTaskFlag) + owner, err := d.checkOwner(t, bgJobFlag) if terror.ErrorEqual(err, ErrNotOwner) { return nil } @@ -40,8 +41,8 @@ func (d *ddl) handleTaskQueue() error { return errors.Trace(err) } - // get the first task and run - task, err = d.getFirstTask(t) + // get the first background job and run + task, err = d.getFirstBgJob(t) if err != nil { return errors.Trace(err) } @@ -49,18 +50,18 @@ func (d *ddl) handleTaskQueue() error { return nil } - d.runTask(t, task) + d.runBgJob(t, task) if task.IsFinished() { - err = d.finishTask(t, task) + err = d.finishBgJob(t, task) } else { - err = d.updateTask(t, task) + err = d.updateBgJob(t, task) } if err != nil { return errors.Trace(err) } owner.LastUpdateTS = time.Now().UnixNano() - err = t.SetDDLTaskOwner(owner) + err = t.SetBgJobOwner(owner) return errors.Trace(err) }) @@ -72,7 +73,8 @@ func (d *ddl) handleTaskQueue() error { return nil } -func (d *ddl) runTask(t *meta.Meta, task *model.Job) { +// runBgJob runs background job. +func (d *ddl) runBgJob(t *meta.Meta, task *model.Job) { task.State = model.JobRunning var err error @@ -83,12 +85,12 @@ func (d *ddl) runTask(t *meta.Meta, task *model.Job) { err = d.delReorgTable(t, task) default: task.State = model.JobCancelled - err = errors.Errorf("invalid task %v", task) + err = errors.Errorf("invalid background job %v", task) } if err != nil { if task.State != model.JobCancelled { - log.Errorf("run task err %v", errors.ErrorStack(err)) + log.Errorf("run background job err %v", errors.ErrorStack(err)) } task.Error = err.Error() @@ -96,7 +98,8 @@ func (d *ddl) runTask(t *meta.Meta, task *model.Job) { } } -func (d *ddl) prepareTask(job *model.Job) error { +// prepareBgJob prepares background job. +func (d *ddl) prepareBgJob(job *model.Job) error { task := &model.Job{ ID: job.ID, SchemaID: job.SchemaID, @@ -107,7 +110,7 @@ func (d *ddl) prepareTask(job *model.Job) error { err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { t := meta.NewMeta(txn) - err1 := t.EnQueueDDLTask(task) + err1 := t.EnQueueBgJob(task) return errors.Trace(err1) }) @@ -115,38 +118,42 @@ func (d *ddl) prepareTask(job *model.Job) error { return errors.Trace(err) } -func (d *ddl) startTask(tp model.ActionType) { +// startBgJob starts background job. +func (d *ddl) startBgJob(tp model.ActionType) { switch tp { case model.ActionDropSchema, model.ActionDropTable: - asyncNotify(d.dropTaskCh) + asyncNotify(d.bgJobCh) } } -func (d *ddl) getFirstTask(t *meta.Meta) (*model.Job, error) { - task, err := t.GetDDLTask(0) +// getFirstBgJob gets the first background job. +func (d *ddl) getFirstBgJob(t *meta.Meta) (*model.Job, error) { + task, err := t.GetBgJob(0) return task, errors.Trace(err) } -func (d *ddl) updateTask(t *meta.Meta, task *model.Job) error { - err := t.UpdateDDLTask(0, task) +// updateBgJob updates background job. +func (d *ddl) updateBgJob(t *meta.Meta, task *model.Job) error { + err := t.UpdateBgJob(0, task) return errors.Trace(err) } -func (d *ddl) finishTask(t *meta.Meta, task *model.Job) error { - log.Warnf("[ddl] finish DDL task %v", task) - if _, err := t.DeQueueDDLTask(); err != nil { +// finishBgJob finishs background job. +func (d *ddl) finishBgJob(t *meta.Meta, task *model.Job) error { + log.Warnf("[ddl] finish background job %v", task) + if _, err := t.DeQueueBgJob(); err != nil { return errors.Trace(err) } - err := t.AddHistoryDDLTask(task) + err := t.AddHistoryBgJob(task) return errors.Trace(err) } -func (d *ddl) onExecute() { +func (d *ddl) onBackgroundWorker() { defer d.wait.Done() - // ensure that have ddl job convert to ddl task + // ensure that have ddl job convert to background job. checkTime := chooseLeaseTime(8*d.lease, 10*time.Second) ticker := time.NewTicker(checkTime) @@ -155,15 +162,15 @@ func (d *ddl) onExecute() { for { select { case <-ticker.C: - log.Debugf("[ddl] wait %s to check DDL task status again", checkTime) - case <-d.dropTaskCh: + log.Debugf("[ddl] wait %s to check background job status again", checkTime) + case <-d.bgJobCh: case <-d.quitCh: return } - err := d.handleTaskQueue() + err := d.handleBgJobQueue() if err != nil { - log.Errorf("[ddl] handle task err %v", errors.ErrorStack(err)) + log.Errorf("[ddl] handle background job err %v", errors.ErrorStack(err)) } } } diff --git a/ddl/executor_test.go b/ddl/bg_worker_test.go similarity index 78% rename from ddl/executor_test.go rename to ddl/bg_worker_test.go index 2770fbdabc..9234a05cb7 100644 --- a/ddl/executor_test.go +++ b/ddl/bg_worker_test.go @@ -37,25 +37,25 @@ func (s *testDDLSuite) TestDropSchemaError(c *C) { Name: model.CIStr{O: "test"}, }}, } - d.prepareTask(task) - d.startTask(task.Type) + d.prepareBgJob(task) + d.startBgJob(task.Type) time.Sleep(lease) - testCheckTaskCancelled(c, d, task) + testCheckBgJobCancelled(c, d, task) } -func testCheckTaskCancelled(c *C, d *ddl, task *model.Job) { +func testCheckBgJobCancelled(c *C, d *ddl, task *model.Job) { kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) - historyTask, err := t.GetHistoryDDLTask(task.ID) + historyBgJob, err := t.GetHistoryBgJob(task.ID) c.Assert(err, IsNil) - c.Assert(historyTask.State, Equals, model.JobCancelled) + c.Assert(historyBgJob.State, Equals, model.JobCancelled) return nil }) } -func (s *testDDLSuite) TestInvalidTaskType(c *C) { +func (s *testDDLSuite) TestInvalidBgJobType(c *C) { store := testCreateStore(c, "test_invalid_task_type") defer store.Close() @@ -68,9 +68,9 @@ func (s *testDDLSuite) TestInvalidTaskType(c *C) { TableID: 1, Type: model.ActionCreateTable, } - d.prepareTask(task) - d.startTask(model.ActionDropTable) + d.prepareBgJob(task) + d.startBgJob(model.ActionDropTable) time.Sleep(lease) - testCheckTaskCancelled(c, d, task) + testCheckBgJobCancelled(c, d, task) } diff --git a/ddl/column_test.go b/ddl/column_test.go index e9169c50dd..e331a38099 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -79,7 +79,7 @@ func testCreateColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, t Args: []interface{}{col, pos, 0}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } @@ -92,7 +92,7 @@ func testDropColumn(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tbl Args: []interface{}{model.NewCIStr(colName)}, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) if isError { c.Assert(err, NotNil) return nil diff --git a/ddl/ddl.go b/ddl/ddl.go index b7a10f4b39..7ccc312515 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -71,12 +71,12 @@ type ddl struct { hook Callback store kv.Storage // schema lease seconds. - lease time.Duration - uuid string - jobCh chan struct{} - jobDoneCh chan struct{} - // drop database/table task - dropTaskCh chan struct{} + lease time.Duration + uuid string + ddlJobCh chan struct{} + ddlJobDoneCh chan struct{} + // drop database/table job runs in the background to preform. + bgJobCh chan struct{} // reorgDoneCh is for reorganization, if the reorganization job is done, // we will use this channel to notify outer. // TODO: now we use goroutine to simulate reorganization jobs, later we may @@ -98,14 +98,14 @@ func newDDL(store kv.Storage, infoHandle *infoschema.Handle, hook Callback, leas } d := &ddl{ - infoHandle: infoHandle, - hook: hook, - store: store, - lease: lease, - uuid: uuid.NewV4().String(), - jobCh: make(chan struct{}, 1), - jobDoneCh: make(chan struct{}, 1), - dropTaskCh: make(chan struct{}, 1), + infoHandle: infoHandle, + hook: hook, + store: store, + lease: lease, + uuid: uuid.NewV4().String(), + ddlJobCh: make(chan struct{}, 1), + ddlJobDoneCh: make(chan struct{}, 1), + bgJobCh: make(chan struct{}, 1), } d.start() @@ -123,18 +123,24 @@ func (d *ddl) Stop() error { err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { t := meta.NewMeta(txn) - owner, err := t.GetDDLOwner() + owner, err := t.GetDDLJobOwner() if err != nil { return errors.Trace(err) } - // job's owner is me, clean it so other servers can compete for it quickly. - if owner != nil && owner.OwnerID == d.uuid { - if err = t.SetDDLOwner(&model.Owner{}); err != nil { - return errors.Trace(err) - } + if owner == nil && owner.OwnerID != d.uuid { + return nil } - owner, err = t.GetDDLTaskOwner() + // job's owner is me, clean it so other servers can compete for it quickly. + return t.SetDDLJobOwner(&model.Owner{}) + }) + if err != nil { + return errors.Trace(err) + } + + err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + owner, err := t.GetBgJobOwner() if err != nil { return errors.Trace(err) } @@ -142,8 +148,9 @@ func (d *ddl) Stop() error { return nil } // task's owner is me, clean it so other servers can compete for it quickly. - return t.SetDDLTaskOwner(&model.Owner{}) + return t.SetBgJobOwner(&model.Owner{}) }) + return errors.Trace(err) } @@ -163,12 +170,12 @@ func (d *ddl) Start() error { func (d *ddl) start() { d.quitCh = make(chan struct{}) d.wait.Add(2) - go d.onExecute() - go d.onWorker() + go d.onBackgroundWorker() + go d.onDDLWorker() // for every start, we will send a fake job to let worker // check owner first and try to find whether a job exists and run. - asyncNotify(d.jobCh) - asyncNotify(d.dropTaskCh) + asyncNotify(d.ddlJobCh) + asyncNotify(d.bgJobCh) } func (d *ddl) close() { @@ -261,7 +268,7 @@ func (d *ddl) CreateSchema(ctx context.Context, schema model.CIStr, charsetInfo Args: []interface{}{dbInfo}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -278,7 +285,7 @@ func (d *ddl) DropSchema(ctx context.Context, schema model.CIStr) (err error) { Type: model.ActionDropSchema, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -525,7 +532,7 @@ func (d *ddl) CreateTable(ctx context.Context, ident table.Ident, colDefs []*col Args: []interface{}{tbInfo}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -618,7 +625,7 @@ func (d *ddl) AddColumn(ctx context.Context, ti table.Ident, spec *AlterSpecific Args: []interface{}{&col.ColumnInfo, spec.Position, 0}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -649,7 +656,7 @@ func (d *ddl) DropColumn(ctx context.Context, ti table.Ident, colName model.CISt Args: []interface{}{colName}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -673,7 +680,7 @@ func (d *ddl) DropTable(ctx context.Context, ti table.Ident) (err error) { Type: model.ActionDropTable, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -701,7 +708,7 @@ func (d *ddl) CreateIndex(ctx context.Context, ti table.Ident, unique bool, inde Args: []interface{}{unique, indexName, indexID, idxColNames}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } @@ -725,7 +732,7 @@ func (d *ddl) DropIndex(ctx context.Context, ti table.Ident, indexName model.CIS Args: []interface{}{indexName}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) err = d.hook.OnChanged(err) return errors.Trace(err) } diff --git a/ddl/worker.go b/ddl/ddl_worker.go similarity index 82% rename from ddl/worker.go rename to ddl/ddl_worker.go index 73de0c4619..b3ab3cc565 100644 --- a/ddl/worker.go +++ b/ddl/ddl_worker.go @@ -25,7 +25,7 @@ import ( "github.com/pingcap/tidb/terror" ) -func (d *ddl) startJob(ctx context.Context, job *model.Job) error { +func (d *ddl) startDDLJob(ctx context.Context, job *model.Job) error { // for every DDL, we must commit current transaction. if err := ctx.FinishTxn(false); err != nil { return errors.Trace(err) @@ -49,7 +49,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { } // notice worker that we push a new job and wait the job done. - asyncNotify(d.jobCh) + asyncNotify(d.ddlJobCh) log.Warnf("[ddl] start DDL job %v", job) @@ -63,16 +63,16 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { defer ticker.Stop() for { select { - case <-d.jobDoneCh: + case <-d.ddlJobDoneCh: case <-ticker.C: } - historyJob, err = d.getHistoryJob(jobID) + historyJob, err = d.getHistoryDDLJob(jobID) if err != nil { - log.Errorf("[ddl] get history job err %v, check again", err) + log.Errorf("[ddl] get history DDL job err %v, check again", err) continue } else if historyJob == nil { - log.Warnf("[ddl] job %d is not in history, maybe not run", jobID) + log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID) continue } @@ -85,7 +85,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error { } } -func (d *ddl) getHistoryJob(id int64) (*model.Job, error) { +func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) { var job *model.Job err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { @@ -111,9 +111,9 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) { switch flag { case ddlJobFlag: - owner, err = t.GetDDLOwner() - case ddlTaskFlag: - owner, err = t.GetDDLTaskOwner() + owner, err = t.GetDDLJobOwner() + case bgJobFlag: + owner, err = t.GetBgJobOwner() default: err = errors.Errorf("invalid ddl flag %v", flag) } @@ -132,8 +132,8 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) { // the owner will update its owner status every 2 * lease time, so here we use // 4 * lease to check its timeout. maxTimeout := int64(4 * d.lease) - if flag == ddlTaskFlag { - // task doesn't need to guarantee other servers update the schema. + if flag == bgJobFlag { + // backgroun job doesn't need to guarantee other servers update the schema. maxTimeout = int64(2 * d.lease) } if owner.OwnerID == d.uuid || now-owner.LastUpdateTS > maxTimeout { @@ -142,36 +142,36 @@ func (d *ddl) checkOwner(t *meta.Meta, flag string) (*model.Owner, error) { // update status. switch flag { case ddlJobFlag: - err = t.SetDDLOwner(owner) - case ddlTaskFlag: - err = t.SetDDLTaskOwner(owner) + err = t.SetDDLJobOwner(owner) + case bgJobFlag: + err = t.SetBgJobOwner(owner) } if err != nil { return nil, errors.Trace(err) } - log.Debugf("[ddl] become %s owner %s", flag, owner.OwnerID) + log.Debugf("[ddl] become %s job owner %s", flag, owner.OwnerID) } if owner.OwnerID != d.uuid { - log.Debugf("[ddl] not %s owner, owner is %s", flag, owner.OwnerID) + log.Debugf("[ddl] not %s job owner, owner is %s", flag, owner.OwnerID) return nil, errors.Trace(ErrNotOwner) } return owner, nil } -func (d *ddl) getFirstJob(t *meta.Meta) (*model.Job, error) { +func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) { job, err := t.GetDDLJob(0) return job, errors.Trace(err) } // every time we enter another state except final state, we must call this function. -func (d *ddl) updateJob(t *meta.Meta, job *model.Job) error { +func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job) error { err := t.UpdateDDLJob(0, job) return errors.Trace(err) } -func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error { +func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error { log.Warnf("[ddl] finish DDL job %v", job) // done, notice and run next job. _, err := t.DeQueueDDLJob() @@ -180,7 +180,7 @@ func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error { } switch job.Type { case model.ActionDropSchema, model.ActionDropTable: - if err = d.prepareTask(job); err != nil { + if err = d.prepareBgJob(job); err != nil { return errors.Trace(err) } } @@ -196,11 +196,11 @@ var ErrNotOwner = errors.New("DDL: not owner") var ErrWorkerClosed = errors.New("DDL: worker is closed") const ( - ddlJobFlag = "job" - ddlTaskFlag = "task" + ddlJobFlag = "ddl" + bgJobFlag = "background" ) -func (d *ddl) handleJobQueue() error { +func (d *ddl) handleDDLJobQueue() error { for { if d.isClosed() { return nil @@ -221,7 +221,7 @@ func (d *ddl) handleJobQueue() error { // become the owner // get the first job and run - job, err = d.getFirstJob(t) + job, err = d.getFirstDDLJob(t) if job == nil || err != nil { return errors.Trace(err) } @@ -246,12 +246,12 @@ func (d *ddl) handleJobQueue() error { // if run job meets error, we will save this error in job Error // and retry later if the job is not cancelled. - d.runJob(t, job) + d.runDDLJob(t, job) if job.IsFinished() { - err = d.finishJob(t, job) + err = d.finishDDLJob(t, job) } else { - err = d.updateJob(t, job) + err = d.updateDDLJob(t, job) } if err != nil { return errors.Trace(err) @@ -260,7 +260,7 @@ func (d *ddl) handleJobQueue() error { // running job may cost some time, so here we must update owner status to // prevent other become the owner. owner.LastUpdateTS = time.Now().UnixNano() - err = t.SetDDLOwner(owner) + err = t.SetDDLJobOwner(owner) return errors.Trace(err) }) @@ -281,8 +281,8 @@ func (d *ddl) handleJobQueue() error { } if job.IsFinished() { - d.startTask(job.Type) - asyncNotify(d.jobDoneCh) + d.startBgJob(job.Type) + asyncNotify(d.ddlJobDoneCh) } } } @@ -295,9 +295,9 @@ func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration { return n2 } -// onWorker is for async online schema change, it will try to become the owner first, +// onDDLWorker is for async online schema change, it will try to become the owner first, // then wait or pull the job queue to handle a schema change job. -func (d *ddl) onWorker() { +func (d *ddl) onDDLWorker() { defer d.wait.Done() // we use 4 * lease time to check owner's timeout, so here, we will update owner's status @@ -311,19 +311,19 @@ func (d *ddl) onWorker() { select { case <-ticker.C: log.Debugf("[ddl] wait %s to check DDL status again", checkTime) - case <-d.jobCh: + case <-d.ddlJobCh: case <-d.quitCh: return } - err := d.handleJobQueue() + err := d.handleDDLJobQueue() if err != nil { - log.Errorf("[ddl] handle job err %v", errors.ErrorStack(err)) + log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err)) } } } -func (d *ddl) runJob(t *meta.Meta, job *model.Job) { +func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) { if job.IsFinished() { return } @@ -351,14 +351,14 @@ func (d *ddl) runJob(t *meta.Meta, job *model.Job) { default: // invalid job, cancel it. job.State = model.JobCancelled - err = errors.Errorf("invalid job %v", job) + err = errors.Errorf("invalid ddl job %v", job) } // saves error in job, so that others can know error happens. if err != nil { // if job is not cancelled, we should log this error. if job.State != model.JobCancelled { - log.Errorf("run job err %v", errors.ErrorStack(err)) + log.Errorf("run ddl job err %v", errors.ErrorStack(err)) } job.Error = err.Error() diff --git a/ddl/worker_test.go b/ddl/ddl_worker_test.go similarity index 91% rename from ddl/worker_test.go rename to ddl/ddl_worker_test.go index 1fa415fb16..ef0a8ed9a9 100644 --- a/ddl/worker_test.go +++ b/ddl/ddl_worker_test.go @@ -75,19 +75,19 @@ func (s *testDDLSuite) TestCheckOwner(c *C) { time.Sleep(lease) testCheckOwner(c, d1, true, ddlJobFlag) - testCheckOwner(c, d1, true, ddlTaskFlag) + testCheckOwner(c, d1, true, bgJobFlag) d2 := newDDL(store, nil, nil, lease) defer d2.close() testCheckOwner(c, d2, false, ddlJobFlag) - testCheckOwner(c, d2, false, ddlTaskFlag) + testCheckOwner(c, d2, false, bgJobFlag) d1.close() time.Sleep(6 * lease) testCheckOwner(c, d2, true, ddlJobFlag) - testCheckOwner(c, d2, true, ddlTaskFlag) + testCheckOwner(c, d2, true, bgJobFlag) d2.SetLease(1 * time.Second) @@ -101,7 +101,7 @@ func (s *testDDLSuite) TestCheckOwner(c *C) { c.Assert(err, IsNil) testCheckOwner(c, d1, true, ddlJobFlag) - testCheckOwner(c, d1, true, ddlTaskFlag) + testCheckOwner(c, d1, true, bgJobFlag) d2.SetLease(1 * time.Second) d2.SetLease(2 * time.Second) @@ -125,7 +125,7 @@ func (s *testDDLSuite) TestSchemaError(c *C) { ctx := testNewContext(c, d) - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) } @@ -148,7 +148,7 @@ func (s *testDDLSuite) TestTableError(c *C) { ctx := testNewContext(c, d) - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -157,7 +157,7 @@ func (s *testDDLSuite) TestTableError(c *C) { tblInfo := testTableInfo(c, d, "t", 3) job.Args = []interface{}{tblInfo} - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -167,7 +167,7 @@ func (s *testDDLSuite) TestTableError(c *C) { Type: model.ActionDropTable, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -180,7 +180,7 @@ func (s *testDDLSuite) TestTableError(c *C) { Type: model.ActionDropTable, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -217,7 +217,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionAddIndex, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -227,7 +227,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionDropIndex, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -243,7 +243,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionAddIndex, Args: []interface{}{1}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -253,7 +253,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionAddIndex, Args: []interface{}{false, model.NewCIStr("t"), []*coldef.IndexColName{{ColumnName: "c", Length: 256}}}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -263,7 +263,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionAddIndex, Args: []interface{}{false, model.NewCIStr("c1_index"), []*coldef.IndexColName{{ColumnName: "c", Length: 256}}}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -275,7 +275,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionAddIndex, Args: []interface{}{false, model.NewCIStr("c1_index"), []*coldef.IndexColName{{ColumnName: "c1", Length: 256}}}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -285,7 +285,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionDropIndex, Args: []interface{}{1}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -297,7 +297,7 @@ func (s *testDDLSuite) TestIndexError(c *C) { Type: model.ActionDropIndex, Args: []interface{}{model.NewCIStr("c1_index")}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) } @@ -319,7 +319,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { Type: model.ActionAddColumn, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -329,7 +329,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { Type: model.ActionDropColumn, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -358,7 +358,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { Args: []interface{}{col, pos, 0}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) @@ -369,7 +369,7 @@ func (s *testDDLSuite) TestColumnError(c *C) { Args: []interface{}{1}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) } diff --git a/ddl/index_test.go b/ddl/index_test.go index ded6807a80..a132de3514 100644 --- a/ddl/index_test.go +++ b/ddl/index_test.go @@ -67,7 +67,7 @@ func testCreateIndex(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tb Args: []interface{}{unique, model.NewCIStr(indexName), id, []*coldef.IndexColName{{ColumnName: colName, Length: 256}}}, } - err = d.startJob(ctx, job) + err = d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } @@ -80,7 +80,7 @@ func testDropIndex(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblI Args: []interface{}{model.NewCIStr(indexName)}, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } diff --git a/ddl/reorg.go b/ddl/reorg.go index afaee3d2ef..a00c072974 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -141,7 +141,7 @@ func (d *ddl) isReorgRunnable(txn kv.Transaction) error { } t := meta.NewMeta(txn) - owner, err := t.GetDDLOwner() + owner, err := t.GetDDLJobOwner() if err != nil { return errors.Trace(err) } else if owner == nil || owner.OwnerID != d.uuid { diff --git a/ddl/schema_test.go b/ddl/schema_test.go index 99d9537ab5..0afd7eeaae 100644 --- a/ddl/schema_test.go +++ b/ddl/schema_test.go @@ -48,7 +48,7 @@ func testCreateSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) * Args: []interface{}{dbInfo}, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } @@ -59,13 +59,13 @@ func testDropSchema(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo) *mo Type: model.ActionDropSchema, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } func checkDrop(c *C, t *meta.Meta) bool { - task, err := t.GetDDLTask(0) + task, err := t.GetBgJob(0) c.Assert(err, IsNil) if task == nil { return true @@ -156,7 +156,7 @@ func (s *testSchemaSuite) TestSchema(c *C) { Type: model.ActionDropSchema, } - err := d1.startJob(ctx, job) + err := d1.startDDLJob(ctx, job) c.Assert(terror.ErrorEqual(err, infoschema.DatabaseNotExists), IsTrue) } @@ -195,7 +195,7 @@ func (s *testSchemaSuite) TestSchemaWaitJob(c *C) { Args: []interface{}{dbInfo}, } - err = d2.startJob(ctx, job) + err = d2.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d2, job) @@ -207,7 +207,7 @@ func testRunInterruptedJob(c *C, d *ddl, job *model.Job) { ctx := mock.NewContext() done := make(chan error, 1) go func() { - done <- d.startJob(ctx, job) + done <- d.startDDLJob(ctx, job) }() ticker := time.NewTicker(d.lease * 1) diff --git a/ddl/stat_test.go b/ddl/stat_test.go index b356d16a69..5415b9d83c 100644 --- a/ddl/stat_test.go +++ b/ddl/stat_test.go @@ -59,7 +59,7 @@ func (s *testStatSuite) TestStat(c *C) { ctx := mock.NewContext() done := make(chan error, 1) go func() { - done <- d.startJob(ctx, job) + done <- d.startDDLJob(ctx, job) }() ticker := time.NewTicker(d.lease * 1) diff --git a/ddl/table_test.go b/ddl/table_test.go index c6353f078e..eeb0d9bb6e 100644 --- a/ddl/table_test.go +++ b/ddl/table_test.go @@ -77,7 +77,7 @@ func testCreateTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tb Args: []interface{}{tblInfo}, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } @@ -89,7 +89,7 @@ func testDropTable(c *C, ctx context.Context, d *ddl, dbInfo *model.DBInfo, tblI Type: model.ActionDropTable, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, IsNil) return job } @@ -168,7 +168,7 @@ func (s *testTableSuite) TestTable(c *C) { Args: []interface{}{newTblInfo}, } - err := d.startJob(ctx, job) + err := d.startDDLJob(ctx, job) c.Assert(err, NotNil) testCheckJobCancelled(c, d, job) diff --git a/inspectkv/inspectkv.go b/inspectkv/inspectkv.go index a29c1463bc..b81a9f03ff 100644 --- a/inspectkv/inspectkv.go +++ b/inspectkv/inspectkv.go @@ -41,7 +41,7 @@ func GetDDLInfo(txn kv.Transaction) (*DDLInfo, error) { info := &DDLInfo{} t := meta.NewMeta(txn) - info.Owner, err = t.GetDDLOwner() + info.Owner, err = t.GetDDLJobOwner() if err != nil { return nil, errors.Trace(err) } diff --git a/inspectkv/inspectkv_test.go b/inspectkv/inspectkv_test.go index 28778062dc..5305d44942 100644 --- a/inspectkv/inspectkv_test.go +++ b/inspectkv/inspectkv_test.go @@ -130,7 +130,7 @@ func (s *testSuite) TestGetDDLInfo(c *C) { t := meta.NewMeta(txn) owner := &model.Owner{OwnerID: "owner"} - err = t.SetDDLOwner(owner) + err = t.SetDDLJobOwner(owner) c.Assert(err, IsNil) dbInfo2 := &model.DBInfo{ ID: 2, diff --git a/meta/meta.go b/meta/meta.go index 9511e16642..aeafa0b3e6 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -416,13 +416,13 @@ func (m *Meta) GetTable(dbID int64, tableID int64) (*model.TableInfo, error) { // to operate DDL jobs, and dispatch them to MR Jobs. var ( - mDDLOwnerKey = []byte("DDLOwner") + mDDLJobOwnerKey = []byte("DDLJobOwner") mDDLJobListKey = []byte("DDLJobList") mDDLJobHistoryKey = []byte("DDLJobHistory") mDDLJobReorgKey = []byte("DDLJobReorg") ) -func (m *Meta) getDDLOwner(key []byte) (*model.Owner, error) { +func (m *Meta) getJobOwner(key []byte) (*model.Owner, error) { value, err := m.txn.Get(key) if err != nil || value == nil { return nil, errors.Trace(err) @@ -433,12 +433,12 @@ func (m *Meta) getDDLOwner(key []byte) (*model.Owner, error) { return owner, errors.Trace(err) } -// GetDDLOwner gets the current owner for DDL. -func (m *Meta) GetDDLOwner() (*model.Owner, error) { - return m.getDDLOwner(mDDLOwnerKey) +// GetDDLJobOwner gets the current owner for DDL. +func (m *Meta) GetDDLJobOwner() (*model.Owner, error) { + return m.getJobOwner(mDDLJobOwnerKey) } -func (m *Meta) setDDLOwner(key []byte, o *model.Owner) error { +func (m *Meta) setJobOwner(key []byte, o *model.Owner) error { b, err := json.Marshal(o) if err != nil { return errors.Trace(err) @@ -446,9 +446,9 @@ func (m *Meta) setDDLOwner(key []byte, o *model.Owner) error { return m.txn.Set(key, b) } -// SetDDLOwner sets the current owner for DDL. -func (m *Meta) SetDDLOwner(o *model.Owner) error { - return m.setDDLOwner(mDDLOwnerKey, o) +// SetDDLJobOwner sets the current owner for DDL. +func (m *Meta) SetDDLJobOwner(o *model.Owner) error { + return m.setJobOwner(mDDLJobOwnerKey, o) } func (m *Meta) enQueueDDLJob(key []byte, job *model.Job) error { @@ -587,64 +587,64 @@ func (m *Meta) GetDDLReorgHandle(job *model.Job) (int64, error) { return value, errors.Trace(err) } -// DDL task structure -// DDLTaskOnwer: []byte -// DDLTaskList: list tasks -// DDLTaskHistory: hash -// DDLTaskReorg: hash +// DDL background job structure +// BgJobOnwer: []byte +// BgJobList: list tasks +// BgJobHistory: hash +// BgJobReorg: hash // -// for multi DDL executor, only one can become the owner -// to operate DDL tasks, and dispatch them to MR tasks. +// for multi background worker, only one can become the owner +// to operate background job, and dispatch them to MR background job. var ( - mDDLTaskOwnerKey = []byte("DDLTaskOwner") - mDDLTaskListKey = []byte("DDLTaskList") - mDDLTaskHistoryKey = []byte("DDLTaskHistory") + mBgJobOwnerKey = []byte("BgJobOwner") + mBgJobListKey = []byte("BgJobList") + mBgJobHistoryKey = []byte("BgJobHistory") ) -// UpdateDDLTask updates the DDL task with index. -func (m *Meta) UpdateDDLTask(index int64, task *model.Job) error { - return m.updateDDLJob(index, task, mDDLTaskListKey) +// UpdateBgJob updates the background job with index. +func (m *Meta) UpdateBgJob(index int64, task *model.Job) error { + return m.updateDDLJob(index, task, mBgJobListKey) } -// GetDDLTask returns the DDL task with index. -func (m *Meta) GetDDLTask(index int64) (*model.Job, error) { - task, err := m.getDDLJob(mDDLTaskListKey, index) +// GetBgJob returns the background job with index. +func (m *Meta) GetBgJob(index int64) (*model.Job, error) { + task, err := m.getDDLJob(mBgJobListKey, index) return task, errors.Trace(err) } -// EnQueueDDLTask adds a DDL task to the list. -func (m *Meta) EnQueueDDLTask(task *model.Job) error { - return m.enQueueDDLJob(mDDLTaskListKey, task) +// EnQueueBgJob adds a background job to the list. +func (m *Meta) EnQueueBgJob(task *model.Job) error { + return m.enQueueDDLJob(mBgJobListKey, task) } -// DDLTaskLength returns the DDL task length. -func (m *Meta) DDLTaskLength() (int64, error) { - return m.txn.LLen(mDDLTaskListKey) +// BgJobLength returns the background job length. +func (m *Meta) BgJobLength() (int64, error) { + return m.txn.LLen(mBgJobListKey) } -// AddHistoryDDLTask adds DDL task to history. -func (m *Meta) AddHistoryDDLTask(task *model.Job) error { - return m.addHistoryDDLJob(mDDLTaskHistoryKey, task) +// AddHistoryBgJob adds background job to history. +func (m *Meta) AddHistoryBgJob(task *model.Job) error { + return m.addHistoryDDLJob(mBgJobHistoryKey, task) } -// GetHistoryDDLTask gets a history DDL task. -func (m *Meta) GetHistoryDDLTask(id int64) (*model.Job, error) { - return m.getHistoryDDLJob(mDDLTaskHistoryKey, id) +// GetHistoryBgJob gets a history background job. +func (m *Meta) GetHistoryBgJob(id int64) (*model.Job, error) { + return m.getHistoryDDLJob(mBgJobHistoryKey, id) } -// DeQueueDDLTask pops a DDL task from the list. -func (m *Meta) DeQueueDDLTask() (*model.Job, error) { - return m.deQueueDDLJob(mDDLTaskListKey) +// DeQueueBgJob pops a background job from the list. +func (m *Meta) DeQueueBgJob() (*model.Job, error) { + return m.deQueueDDLJob(mBgJobListKey) } -// GetDDLTaskOwner gets the current task owner for DDL. -func (m *Meta) GetDDLTaskOwner() (*model.Owner, error) { - return m.getDDLOwner(mDDLTaskOwnerKey) +// GetBgJobOwner gets the current background job owner. +func (m *Meta) GetBgJobOwner() (*model.Owner, error) { + return m.getJobOwner(mBgJobOwnerKey) } -// SetDDLTaskOwner sets the current task owner for DDL. -func (m *Meta) SetDDLTaskOwner(o *model.Owner) error { - return m.setDDLOwner(mDDLTaskOwnerKey, o) +// SetBgJobOwner sets the current background job owner. +func (m *Meta) SetBgJobOwner(o *model.Owner) error { + return m.setJobOwner(mBgJobOwnerKey, o) } diff --git a/meta/meta_test.go b/meta/meta_test.go index 7f351eb3af..00ae7f32cc 100644 --- a/meta/meta_test.go +++ b/meta/meta_test.go @@ -175,9 +175,9 @@ func (s *testSuite) TestDDL(c *C) { t := meta.NewMeta(txn) owner := &model.Owner{OwnerID: "1"} - err = t.SetDDLOwner(owner) + err = t.SetDDLJobOwner(owner) c.Assert(err, IsNil) - ov, err := t.GetDDLOwner() + ov, err := t.GetDDLJobOwner() c.Assert(err, IsNil) c.Assert(owner, DeepEquals, ov) @@ -219,36 +219,36 @@ func (s *testSuite) TestDDL(c *C) { c.Assert(v, DeepEquals, job) // DDL task test - err = t.SetDDLTaskOwner(owner) + err = t.SetBgJobOwner(owner) c.Assert(err, IsNil) - ov, err = t.GetDDLTaskOwner() + ov, err = t.GetBgJobOwner() c.Assert(err, IsNil) c.Assert(owner, DeepEquals, ov) task := &model.Job{ID: 1} - err = t.EnQueueDDLTask(task) + err = t.EnQueueBgJob(task) c.Assert(err, IsNil) - n, err = t.DDLTaskLength() + n, err = t.BgJobLength() c.Assert(err, IsNil) c.Assert(n, Equals, int64(1)) - v, err = t.GetDDLTask(0) + v, err = t.GetBgJob(0) c.Assert(err, IsNil) c.Assert(v, DeepEquals, task) - v, err = t.GetDDLTask(1) + v, err = t.GetBgJob(1) c.Assert(err, IsNil) c.Assert(v, IsNil) task.ID = 2 - err = t.UpdateDDLTask(0, task) + err = t.UpdateBgJob(0, task) c.Assert(err, IsNil) - v, err = t.DeQueueDDLTask() + v, err = t.DeQueueBgJob() c.Assert(err, IsNil) c.Assert(v, DeepEquals, task) - err = t.AddHistoryDDLTask(task) + err = t.AddHistoryBgJob(task) c.Assert(err, IsNil) - v, err = t.GetHistoryDDLTask(2) + v, err = t.GetHistoryBgJob(2) c.Assert(err, IsNil) c.Assert(v, DeepEquals, task)