diff --git a/ddl/bg_worker.go b/ddl/bg_worker.go index 55e8fa3b2e..cb80ece4c3 100644 --- a/ddl/bg_worker.go +++ b/ddl/bg_worker.go @@ -41,7 +41,7 @@ func (d *ddl) handleBgJobQueue() error { return errors.Trace(err) } - // get the first background job and run + // Get the first background job and run it. job, err = d.getFirstBgJob(t) if err != nil { return errors.Trace(err) @@ -155,8 +155,8 @@ func (d *ddl) finishBgJob(t *meta.Meta, job *model.Job) error { func (d *ddl) onBackgroundWorker() { defer d.wait.Done() - // we use 4 * lease time to check owner's timeout, so here, we will update owner's status - // every 2 * lease time, if lease is 0, we will use default 10s. + // We use 4 * lease time to check owner's timeout, so here, we will update owner's status + // every 2 * lease time. If lease is 0, we will use default 10s. checkTime := chooseLeaseTime(2*d.lease, 10*time.Second) ticker := time.NewTicker(checkTime) diff --git a/ddl/column.go b/ddl/column.go index c00dd4ff53..1b46a73a23 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -46,7 +46,7 @@ func (d *ddl) adjustColumnOffset(columns []*model.ColumnInfo, indices []*model.I columns[offset].Offset = len(columns) - 1 } - // TODO: index can't cover the add/remove column with offset now, we may check this later. + // TODO: Index can't cover the add/remove column with offset now, we may check this later. // Update index column offset info. for _, idx := range indices { @@ -59,7 +59,7 @@ func (d *ddl) adjustColumnOffset(columns []*model.ColumnInfo, indices []*model.I } } -func (d *ddl) addColumn(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) { +func (d *ddl) createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) { // Check column name duplicate. cols := tblInfo.Columns position := len(cols) @@ -111,17 +111,16 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error { columnInfo := findCol(tblInfo.Columns, col.Name.L) if columnInfo != nil { if columnInfo.State == model.StatePublic { - // we already have a column with same column name + // We already have a column with the same column name. job.State = model.JobCancelled return infoschema.ErrColumnExists.Gen("ADD COLUMN: column already exist %s", col.Name.L) } } else { - columnInfo, offset, err = d.addColumn(tblInfo, col, pos) + columnInfo, offset, err = d.createColumnInfo(tblInfo, col, pos) if err != nil { job.State = model.JobCancelled return errors.Trace(err) } - // Set offset arg to job. if offset != 0 { job.Args = []interface{}{columnInfo, pos, offset} @@ -139,27 +138,24 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error { job.SchemaState = model.StateDeleteOnly columnInfo.State = model.StateDeleteOnly err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateDeleteOnly: // delete only -> write only job.SchemaState = model.StateWriteOnly columnInfo.State = model.StateWriteOnly err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateWriteOnly: // write only -> reorganization job.SchemaState = model.StateWriteReorganization columnInfo.State = model.StateWriteReorganization - // initialize SnapshotVer to 0 for later reorganization check. + // Initialize SnapshotVer to 0 for later reorganization check. job.SnapshotVer = 0 err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateWriteReorganization: // reorganization -> public - // get the current version for reorganization if we don't have + // Get the current version for reorganization if we don't have it. reorgInfo, err := d.getReorgInfo(t, job) if err != nil || reorgInfo.first { - // if we run reorg firstly, we should update the job snapshot version + // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. return errors.Trace(err) } @@ -173,7 +169,8 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error { return d.addTableColumn(tbl, columnInfo, reorgInfo, job) }) if terror.ErrorEqual(err, errWaitReorgTimeout) { - // if timeout, we should return, check for the owner and re-wait job done. + // If the timeout happens, we should return. + // Then check for the owner and re-wait job to finish. return nil } if err != nil { @@ -183,21 +180,20 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error { // Adjust column offset. d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, offset, true) - columnInfo.State = model.StatePublic - if err = t.UpdateTable(schemaID, tblInfo); err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.SchemaState = model.StatePublic job.State = model.JobDone addTableHistoryInfo(job, ver, tblInfo) - return nil default: - return ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State) + err = ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State) } + + return errors.Trace(err) } func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error { @@ -226,8 +222,8 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error { colName, tblInfo.Name) } - // we don't support drop column with index covered now. - // we must drop the index first, then drop the column. + // We don't support dropping column with index covered now. + // We must drop the index first, then drop the column. for _, indexInfo := range tblInfo.Indices { for _, col := range indexInfo.Columns { if col.Name.L == colName.L { @@ -248,36 +244,31 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error { // public -> write only job.SchemaState = model.StateWriteOnly colInfo.State = model.StateWriteOnly - - // set this column's offset to the last and reset all following columns' offset + // Set this column's offset to the last and reset all following columns' offsets. d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, colInfo.Offset, false) - err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateWriteOnly: // write only -> delete only job.SchemaState = model.StateDeleteOnly colInfo.State = model.StateDeleteOnly err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateDeleteOnly: // delete only -> reorganization job.SchemaState = model.StateDeleteReorganization colInfo.State = model.StateDeleteReorganization - // initialize SnapshotVer to 0 for later reorganization check. + // Initialize SnapshotVer to 0 for later reorganization check. job.SnapshotVer = 0 err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateDeleteReorganization: // reorganization -> absent reorgInfo, err := d.getReorgInfo(t, job) if err != nil || reorgInfo.first { - // if we run reorg firstly, we should update the job snapshot version + // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. return errors.Trace(err) } - // all reorganization jobs done, drop this column + // All reorganization jobs are done, drop this column. newColumns := make([]*model.ColumnInfo, 0, len(tblInfo.Columns)) for _, col := range tblInfo.Columns { if col.Name.L != colName.L { @@ -289,14 +280,14 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error { return errors.Trace(err) } - // finish this job + // Finish this job. job.SchemaState = model.StateNone job.State = model.JobDone addTableHistoryInfo(job, ver, tblInfo) - return nil default: - return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) + err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) } + return errors.Trace(err) } // How to backfill column data in reorganization state? diff --git a/ddl/ddl.go b/ddl/ddl.go index c75edce568..ee235f5a96 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -54,7 +54,7 @@ var ( errWaitReorgTimeout = terror.ClassDDL.New(codeWaitReorgTimeout, "wait for reorganization timeout") errInvalidStoreVer = terror.ClassDDL.New(codeInvalidStoreVer, "invalid storage current version") - // we don't support drop column with index covered now. + // We don't support dropping column with index covered now. errCantDropColWithIndex = terror.ClassDDL.New(codeCantDropColWithIndex, "can't drop column with index") errUnsupportedAddColumn = terror.ClassDDL.New(codeUnsupportedAddColumn, "unsupported add column") @@ -120,16 +120,16 @@ type ddl struct { hook Callback hookMu sync.RWMutex store kv.Storage - // schema lease seconds. + // Schema lease seconds. lease time.Duration uuid string ddlJobCh chan struct{} ddlJobDoneCh chan struct{} - // drop database/table job runs in the background. + // Drop database/table job that runs in the background. bgJobCh chan struct{} // reorgDoneCh is for reorganization, if the reorganization job is done, // we will use this channel to notify outer. - // TODO: now we use goroutine to simulate reorganization jobs, later we may + // TODO: Now we use goroutine to simulate reorganization jobs, later we may // use a persistent job list. reorgDoneCh chan error @@ -181,7 +181,7 @@ func (d *ddl) Stop() error { return nil } - // ddl job's owner is me, clean it so other servers can compete for it quickly. + // DDL job's owner is me, clean it so other servers can complete it quickly. return t.SetDDLJobOwner(&model.Owner{}) }) if err != nil { @@ -198,7 +198,7 @@ func (d *ddl) Stop() error { return nil } - // background job's owner is me, clean it so other servers can compete for it quickly. + // Background job's owner is me, clean it so other servers can complete it quickly. return t.SetBgJobOwner(&model.Owner{}) }) @@ -223,8 +223,8 @@ func (d *ddl) start() { d.wait.Add(2) go d.onBackgroundWorker() go d.onDDLWorker() - // for every start, we will send a fake job to let worker - // check owner first and try to find whether a job exists and run. + // For every start, we will send a fake job to let worker + // check owner firstly and try to find whether a job exists and run. asyncNotify(d.ddlJobCh) asyncNotify(d.bgJobCh) } @@ -259,12 +259,12 @@ func (d *ddl) SetLease(lease time.Duration) { log.Warnf("[ddl] change schema lease %s -> %s", d.lease, lease) if d.isClosed() { - // if already closed, just set lease and return + // If already closed, just set lease and return. d.lease = lease return } - // close the running worker and start again + // Close the running worker and start again. d.close() d.lease = lease d.start() @@ -361,7 +361,7 @@ func checkTooLongTable(table model.CIStr) error { func getDefaultCharsetAndCollate() (string, string) { // TODO: TableDefaultCharset-->DatabaseDefaultCharset-->SystemDefaultCharset. - // TODO: change TableOption parser to parse collate. + // TODO: Change TableOption parser to parse collate. // This is a tmp solution. return "utf8", "utf8_unicode_ci" } @@ -372,7 +372,6 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra for _, key := range v.Keys { c, ok := colMap[key.Column.Name.L] if !ok { - // TODO: table constraint on unknown column. continue } c.Flag |= mysql.PriKeyFlag @@ -383,7 +382,6 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra for i, key := range v.Keys { c, ok := colMap[key.Column.Name.L] if !ok { - // TODO: table constraint on unknown column. continue } if i == 0 { @@ -402,7 +400,6 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra for i, key := range v.Keys { c, ok := colMap[key.Column.Name.L] if !ok { - // TODO: table constraint on unknown column. continue } if i == 0 { @@ -427,7 +424,7 @@ func (d *ddl) buildColumnsAndConstraints(ctx context.Context, colDefs []*ast.Col cols = append(cols, col) colMap[colDef.Name.Name.L] = col } - // traverse table Constraints and set col.flag + // Traverse table Constraints and set col.flag. for _, v := range constraints { setColumnFlagWithConstraint(colMap, v) } @@ -878,7 +875,6 @@ func (d *ddl) CreateTable(ctx context.Context, ident ast.Ident, colDefs []*ast.C Type: model.ActionCreateTable, Args: []interface{}{tbInfo}, } - // Handle Table Options d.handleTableOptions(options, tbInfo, schema.ID) err = d.doDDLJob(ctx, job) @@ -927,7 +923,7 @@ func (d *ddl) handleTableOptions(options []*ast.TableOption, tbInfo *model.Table } func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) { - // now we only allow one schema changes at the same time. + // Now we only allow one schema changing at the same time. if len(specs) != 1 { return errRunMultiSchemaChanges } @@ -950,12 +946,12 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte case ast.ConstraintForeignKey: err = d.CreateForeignKey(ctx, ident, model.NewCIStr(constr.Name), spec.Constraint.Keys, spec.Constraint.Refer) default: - // nothing to do now. + // Nothing to do now. } case ast.AlterTableDropForeignKey: err = d.DropForeignKey(ctx, ident, model.NewCIStr(spec.Name)) default: - // nothing to do now. + // Nothing to do now. } if err != nil { @@ -1007,8 +1003,8 @@ func (d *ddl) AddColumn(ctx context.Context, ti ast.Ident, spec *ast.AlterTableS return ErrTooLongIdent.Gen("too long column %s", colName) } - // ingore table constraints now, maybe return error later - // we use length(t.Cols()) as the default offset first, later we will change the + // Ingore table constraints now, maybe return error later. + // We use length(t.Cols()) as the default offset firstly, later we will change the // column's offset later. col, _, err = d.buildColumnAndConstraint(ctx, len(t.Cols()), spec.Column) if err != nil { @@ -1241,6 +1237,64 @@ func (d *ddl) DropIndex(ctx context.Context, ti ast.Ident, indexName model.CIStr return errors.Trace(err) } +func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error { + // For every DDL, we must commit current transaction. + if err := ctx.CommitTxn(); err != nil { + return errors.Trace(err) + } + + // Get a global job ID and put the DDL job in the queue. + err := d.addDDLJob(ctx, job) + if err != nil { + return errors.Trace(err) + } + + // Notice worker that we push a new job and wait the job done. + asyncNotify(d.ddlJobCh) + log.Warnf("[ddl] start DDL job %v", job) + + var historyJob *model.Job + jobID := job.ID + // For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public + // For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease. + ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second)) + startTime := time.Now() + jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc() + defer func() { + ticker.Stop() + jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Dec() + retLabel := handleJobSucc + if err != nil { + retLabel = handleJobFailed + } + handleJobHistogram.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String(), + retLabel).Observe(time.Since(startTime).Seconds()) + }() + for { + select { + case <-d.ddlJobDoneCh: + case <-ticker.C: + } + + historyJob, err = d.getHistoryDDLJob(jobID) + if err != nil { + log.Errorf("[ddl] get history DDL job err %v, check again", err) + continue + } else if historyJob == nil { + log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID) + continue + } + + // If a job is a history job, the state must be JobDone or JobCancel. + if historyJob.State == model.JobDone { + log.Warnf("[ddl] DDL job %d is finished", jobID) + return nil + } + + return errors.Trace(historyJob.Error) + } +} + func (d *ddl) callHookOnChanged(err error) error { d.hookMu.Lock() defer d.hookMu.Unlock() diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e4780005bb..d9807f14f2 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -26,94 +26,34 @@ import ( "github.com/pingcap/tidb/terror" ) -func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error { - // for every DDL, we must commit current transaction. - if err := ctx.CommitTxn(); err != nil { - return errors.Trace(err) - } - var startTS uint64 - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - var err error - job.ID, err = t.GenGlobalID() - startTS = txn.StartTS() - return errors.Trace(err) - }) - if err != nil { - return errors.Trace(err) - } - ddlQuery, _ := ctx.Value(context.QueryString).(string) - job.Query = ddlQuery +// onDDLWorker is for async online schema changing, it will try to become the owner firstly, +// then wait or pull the job queue to handle a schema change job. +func (d *ddl) onDDLWorker() { + defer d.wait.Done() - // Create a new job and queue it. - err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - err1 := t.EnQueueDDLJob(job) - return errors.Trace(err1) - }) - if err != nil { - return errors.Trace(err) - } + // We use 4 * lease time to check owner's timeout, so here, we will update owner's status + // every 2 * lease time. If lease is 0, we will use default 10s. + checkTime := chooseLeaseTime(2*d.lease, 10*time.Second) - // notice worker that we push a new job and wait the job done. - asyncNotify(d.ddlJobCh) + ticker := time.NewTicker(checkTime) + defer ticker.Stop() - log.Warnf("[ddl] start DDL job %v", job) - - var historyJob *model.Job - jobID := job.ID - // for a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public - // for every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease. - ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second)) - startTime := time.Now() - jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc() - defer func() { - ticker.Stop() - jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Dec() - retLabel := handleJobSucc - if err != nil { - retLabel = handleJobFailed - } - handleJobHistogram.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String(), - retLabel).Observe(time.Since(startTime).Seconds()) - }() for { select { - case <-d.ddlJobDoneCh: case <-ticker.C: + log.Debugf("[ddl] wait %s to check DDL status again", checkTime) + case <-d.ddlJobCh: + case <-d.quitCh: + return } - historyJob, err = d.getHistoryDDLJob(jobID) + err := d.handleDDLJobQueue() if err != nil { - log.Errorf("[ddl] get history DDL job err %v, check again", err) - continue - } else if historyJob == nil { - log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID) - continue + log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err)) } - - // if a job is a history table, the state must be JobDone or JobCancel. - if historyJob.State == model.JobDone { - return nil - } - - return errors.Trace(historyJob.Error) } } -func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) { - var job *model.Job - - err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { - t := meta.NewMeta(txn) - var err1 error - job, err1 = t.GetHistoryDDLJob(id) - return errors.Trace(err1) - }) - - return job, errors.Trace(err) -} - func asyncNotify(ch chan struct{}) { select { case ch <- struct{}{}: @@ -208,20 +148,41 @@ func (d *ddl) getJobOwner(t *meta.Meta, flag JobType) (*model.Owner, error) { return owner, errors.Trace(err) } +// addDDLJob gets a global job ID and puts the DDL job in the DDL queue. +func (d *ddl) addDDLJob(ctx context.Context, job *model.Job) error { + job.Query, _ = ctx.Value(context.QueryString).(string) + return kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + + var err error + job.ID, err = t.GenGlobalID() + if err != nil { + return errors.Trace(err) + } + + err = t.EnQueueDDLJob(job) + return errors.Trace(err) + }) +} + +// getFirstDDLJob gets the first DDL job form DDL queue. func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) { job, err := t.GetDDLJob(0) return job, errors.Trace(err) } -// every time we enter another state except final state, we must call this function. +// updateDDLJob updates the DDL job information. +// Every time we enter another state except final state, we must call this function. func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job) error { err := t.UpdateDDLJob(0, job) return errors.Trace(err) } +// finishDDLJob deletes the finished DDL job in the ddl queue and puts it to history queue. +// If the DDL job need to handle in background, it will prepare a background job. func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error { - log.Warnf("[ddl] finish DDL job %v", job) - // done, notice and run next job. + log.Infof("[ddl] finish DDL job %v", job) + // Job is finished, notice and run the next job. _, err := t.DeQueueDDLJob() if err != nil { return errors.Trace(err) @@ -237,6 +198,20 @@ func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error { return errors.Trace(err) } +// getHistoryDDLJob gets a DDL job with job's ID form history queue. +func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) { + var job *model.Job + + err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { + t := meta.NewMeta(txn) + var err1 error + job, err1 = t.GetHistoryDDLJob(id) + return errors.Trace(err1) + }) + + return job, errors.Trace(err) +} + // JobType is job type, including ddl/background. type JobType int @@ -268,24 +243,23 @@ func (d *ddl) handleDDLJobQueue() error { t := meta.NewMeta(txn) owner, err := d.checkOwner(t, ddlJobFlag) if terror.ErrorEqual(err, errNotOwner) { - // we are not owner, return and retry checking later. + // We are not owner, return and retry checking later. return nil } else if err != nil { return errors.Trace(err) } - // become the owner - // get the first job and run + // We become the owner. Get the first job and run it. job, err = d.getFirstDDLJob(t) if job == nil || err != nil { return errors.Trace(err) } if job.IsRunning() { - // if we enter a new state, crash when waiting 2 * lease time, and restart quickly, + // If we enter a new state, crash when waiting 2 * lease time, and restart quickly, // we may run the job immediately again, but we don't wait enough 2 * lease time to // let other servers update the schema. - // so here we must check the elapsed time from last update, if < 2 * lease, we must + // So here we must check the elapsed time from last update, if < 2 * lease, we must // wait again. elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS) if elapsed > 0 && elapsed < waitTime { @@ -295,16 +269,13 @@ func (d *ddl) handleDDLJobQueue() error { } } - log.Warnf("[ddl] run DDL job %v", job) - d.hookMu.Lock() d.hook.OnJobRunBefore(job) d.hookMu.Unlock() - // if run job meets error, we will save this error in job Error + // If running job meets error, we will save this error in job Error // and retry later if the job is not cancelled. d.runDDLJob(t, job) - if job.IsFinished() { binloginfo.SetDDLBinlog(txn, job.ID, job.Query) err = d.finishDDLJob(t, job) @@ -315,17 +286,16 @@ func (d *ddl) handleDDLJobQueue() error { return errors.Trace(err) } - // running job may cost some time, so here we must update owner status to + // Running job may cost some time, so here we must update owner status to // prevent other become the owner. owner.LastUpdateTS = time.Now().UnixNano() err = t.SetDDLJobOwner(owner) - return errors.Trace(err) }) if err != nil { return errors.Trace(err) } else if job == nil { - // no job now, return and retry get later. + // No job now, return and retry getting later. return nil } @@ -333,13 +303,12 @@ func (d *ddl) handleDDLJobQueue() error { d.hook.OnJobUpdated(job) d.hookMu.Unlock() - // here means the job enters another state (delete only, write only, public, etc...) or is cancelled. - // if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update + // Here means the job enters another state (delete only, write only, public, etc...) or is cancelled. + // If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update // the newest schema. if job.State == model.JobRunning || job.State == model.JobDone { d.waitSchemaChanged(waitTime) } - if job.IsFinished() { d.startBgJob(job.Type) asyncNotify(d.ddlJobDoneCh) @@ -355,35 +324,9 @@ func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration { return n2 } -// onDDLWorker is for async online schema change, it will try to become the owner first, -// then wait or pull the job queue to handle a schema change job. -func (d *ddl) onDDLWorker() { - defer d.wait.Done() - - // we use 4 * lease time to check owner's timeout, so here, we will update owner's status - // every 2 * lease time, if lease is 0, we will use default 10s. - checkTime := chooseLeaseTime(2*d.lease, 10*time.Second) - - ticker := time.NewTicker(checkTime) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - log.Debugf("[ddl] wait %s to check DDL status again", checkTime) - case <-d.ddlJobCh: - case <-d.quitCh: - return - } - - err := d.handleDDLJobQueue() - if err != nil { - log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err)) - } - } -} - +// runDDLJob runs a DDL job. func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) { + log.Warnf("[ddl] run DDL job %v", job) if job.IsFinished() { return } @@ -417,14 +360,14 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) { case model.ActionTruncateTable: err = d.onTruncateTable(t, job) default: - // invalid job, cancel it. + // Invalid job, cancel it. job.State = model.JobCancelled err = errInvalidDDLJob.Gen("invalid ddl job %v", job) } - // saves error in job, so that others can know error happens. + // Save errors in job, so that others can know errors happened. if err != nil { - // if job is not cancelled, we should log this error. + // If job is not cancelled, we should log this error. if job.State != model.JobCancelled { log.Errorf("[ddl] run ddl job err %v", errors.ErrorStack(err)) } @@ -441,11 +384,11 @@ func toTError(err error) *terror.Error { return tErr } - // TODO: add the error code + // TODO: Add the error code. return terror.ClassDDL.New(terror.CodeUnknown, err.Error()) } -// for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time +// For every lease, we will re-update the whole schema, so we will wait 2 * lease time // to guarantee that all servers have already updated schema. func (d *ddl) waitSchemaChanged(waitTime time.Duration) { if waitTime == 0 { diff --git a/ddl/foreign_key.go b/ddl/foreign_key.go index 4166fa7605..a1b49615e7 100644 --- a/ddl/foreign_key.go +++ b/ddl/foreign_key.go @@ -50,7 +50,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) error { if err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.State = model.JobDone addTableHistoryInfo(job, ver, tblInfo) return nil @@ -112,7 +112,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) error { if err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.State = model.JobDone addTableHistoryInfo(job, ver, tblInfo) return nil diff --git a/ddl/index.go b/ddl/index.go index 9f4e7cbb67..da5a45d05a 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -111,7 +111,7 @@ func dropIndexColumnFlag(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) { } func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { - // rollback job + // Handle rollback job. if job.State == model.JobRollback { err := d.onDropIndex(t, job) if err != nil { @@ -120,7 +120,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { return nil } - // normal job + // Handle normal job. schemaID := job.SchemaID tblInfo, err := d.getTableInfo(t, job) if err != nil { @@ -143,11 +143,10 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { for _, idx := range tblInfo.Indices { if idx.Name.L == indexName.L { if idx.State == model.StatePublic { - // we already have a index with same index name + // We already have an index with same index name. job.State = model.JobCancelled return infoschema.ErrIndexExists.Gen("CREATE INDEX: index already exist %s", indexName) } - indexInfo = idx break } @@ -184,7 +183,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { // write only -> reorganization job.SchemaState = model.StateWriteReorganization indexInfo.State = model.StateWriteReorganization - // initialize SnapshotVer to 0 for later reorganization check. + // Initialize SnapshotVer to 0 for later reorganization check. job.SnapshotVer = 0 err = t.UpdateTable(schemaID, tblInfo) return errors.Trace(err) @@ -192,7 +191,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { // reorganization -> public reorgInfo, err := d.getReorgInfo(t, job) if err != nil || reorgInfo.first { - // if we run reorg firstly, we should update the job snapshot version + // If we run reorg firstly, we should update the job snapshot version // and then run the reorg next time. return errors.Trace(err) } @@ -206,7 +205,6 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { err = d.runReorgJob(func() error { return d.addTableIndex(tbl, indexInfo, reorgInfo, job) }) - if terror.ErrorEqual(err, errWaitReorgTimeout) { // if timeout, we should return, check for the owner and re-wait job done. return nil @@ -220,13 +218,13 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error { } indexInfo.State = model.StatePublic - // set column index flag. + // Set column index flag. addIndexColumnFlag(tblInfo, indexInfo) if err = t.UpdateTable(schemaID, tblInfo); err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.SchemaState = model.StatePublic job.State = model.JobDone addTableHistoryInfo(job, ver, tblInfo) @@ -290,34 +288,31 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error { job.SchemaState = model.StateWriteOnly indexInfo.State = model.StateWriteOnly err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateWriteOnly: // write only -> delete only job.SchemaState = model.StateDeleteOnly indexInfo.State = model.StateDeleteOnly err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateDeleteOnly: // delete only -> reorganization job.SchemaState = model.StateDeleteReorganization indexInfo.State = model.StateDeleteReorganization err = t.UpdateTable(schemaID, tblInfo) - return errors.Trace(err) case model.StateDeleteReorganization: // reorganization -> absent err = d.runReorgJob(func() error { return d.dropTableIndex(indexInfo, job) }) - if terror.ErrorEqual(err, errWaitReorgTimeout) { - // if timeout, we should return, check for the owner and re-wait job done. + // If the timeout happens, we should return. + // Then check for the owner and re-wait job to finish. return nil } if err != nil { return errors.Trace(err) } - // all reorganization jobs done, drop this index + // All reorganization jobs are done, drop this index. newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices)) for _, idx := range tblInfo.Indices { if idx.Name.L != indexName.L { @@ -325,13 +320,13 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error { } } tblInfo.Indices = newIndices - // set column index flag. + // Set column index flag. dropIndexColumnFlag(tblInfo, indexInfo) if err = t.UpdateTable(schemaID, tblInfo); err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.SchemaState = model.StateNone if job.State == model.JobRollback { job.State = model.JobRollbackDone @@ -339,10 +334,10 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error { job.State = model.JobDone } addTableHistoryInfo(job, ver, tblInfo) - return nil default: - return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) + err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State) } + return errors.Trace(err) } func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) ( diff --git a/ddl/schema.go b/ddl/schema.go index bb87540baf..583824fd35 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -26,7 +26,7 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error { schemaID := job.SchemaID dbInfo := &model.DBInfo{} if err := job.DecodeArgs(dbInfo); err != nil { - // arg error, cancel this job. + // Invalid arguments, cancel this job. job.State = model.JobCancelled return errors.Trace(err) } @@ -42,7 +42,7 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error { for _, db := range dbs { if db.Name.L == dbInfo.Name.L { if db.ID != schemaID { - // database exists, can't create, we should cancel this job now. + // The database already exists, can't create it, we should cancel this job now. job.State = model.JobCancelled return errors.Trace(infoschema.ErrDatabaseExists) } @@ -64,12 +64,12 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error { if err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.State = model.JobDone addDBHistoryInfo(job, ver, dbInfo) return nil default: - // we can't enter here. + // We can't enter here. return errors.Errorf("invalid db state %v", dbInfo.State) } } @@ -112,7 +112,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { break } - // finish this job + // Finish this job. addDBHistoryInfo(job, ver, dbInfo) if len(tables) > 0 { job.Args = append(job.Args, getIDs(tables)) @@ -120,7 +120,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { job.State = model.JobDone job.SchemaState = model.StateNone default: - // we can't enter here. + // We can't enter here. err = errors.Errorf("invalid db state %v", dbInfo.State) } @@ -152,7 +152,7 @@ func (d *ddl) delReorgSchema(t *meta.Meta, job *model.Job) error { return nil } - // finish this background job + // Finish this background job. job.SchemaState = model.StateNone job.State = model.JobDone diff --git a/ddl/table.go b/ddl/table.go index a7eea41acc..812f021e22 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -29,13 +29,14 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error { schemaID := job.SchemaID tbInfo := &model.TableInfo{} if err := job.DecodeArgs(tbInfo); err != nil { - // arg error, cancel this job. + // Invalid arguments, cancel this job. job.State = model.JobCancelled return errors.Trace(err) } tbInfo.State = model.StateNone + // Check this table's database. tables, err := t.ListTables(schemaID) if terror.ErrorEqual(err, meta.ErrDBNotExists) { job.State = model.JobCancelled @@ -44,10 +45,11 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error { return errors.Trace(err) } + // Check the table. for _, tbl := range tables { if tbl.Name.L == tbInfo.Name.L { if tbl.ID != tbInfo.ID { - // table exists, can't create, we should cancel this job now. + // This table already exists, can't create it, we should cancel this job now. job.State = model.JobCancelled return errors.Trace(infoschema.ErrTableExists) } @@ -70,7 +72,7 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error { if err != nil { return errors.Trace(err) } - // finish this job + // Finish this job. job.State = model.JobDone addTableHistoryInfo(job, ver, tbInfo) return nil @@ -83,6 +85,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error { schemaID := job.SchemaID tableID := job.TableID + // Check this table's database. tblInfo, err := t.GetTable(schemaID, tableID) if terror.ErrorEqual(err, meta.ErrDBNotExists) { job.State = model.JobCancelled @@ -91,6 +94,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error { return errors.Trace(err) } + // Check the table. if tblInfo == nil { job.State = model.JobCancelled return errors.Trace(infoschema.ErrTableNotExists) @@ -118,7 +122,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error { if err = t.DropTable(job.SchemaID, job.TableID); err != nil { break } - // finish this job + // Finish this job. job.State = model.JobDone job.SchemaState = model.StateNone addTableHistoryInfo(job, ver, tblInfo) @@ -146,7 +150,7 @@ func (d *ddl) delReorgTable(t *meta.Meta, job *model.Job) error { if err != nil { return errors.Trace(err) } - // finish this background job + // Finish this background job. if delCount < limit { job.SchemaState = model.StateNone job.State = model.JobDone diff --git a/model/ddl.go b/model/ddl.go index 6365f07972..ef435b3f1d 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -78,19 +78,19 @@ type Job struct { TableID int64 `json:"table_id"` State JobState `json:"state"` Error *terror.Error `json:"err"` - // every time we meet an error when running job, we will increase it. + // Every time we meet an error when running job, we will increase it. ErrorCount int64 `json:"err_count"` - // the number of rows are processed. + // The number of rows that are processed. RowCount int64 `json:"row_count"` Mu sync.Mutex `json:"-"` Args []interface{} `json:"-"` - // we must use json raw message for delay parsing special args. + // We must use json raw message to delay parsing special args. RawArgs json.RawMessage `json:"raw_args"` SchemaState SchemaState `json:"schema_state"` - // snapshot version for this job. + // Snapshot version for this job. SnapshotVer uint64 `json:"snapshot_ver"` // unix nano seconds - // TODO: use timestamp allocated by TSO. + // TODO: Use timestamp allocated by TSO. LastUpdateTS int64 `json:"last_update_ts"` // Query string of the ddl job. Query string `json:"query"` @@ -203,7 +203,7 @@ func (s JobState) String() string { type Owner struct { OwnerID string `json:"owner_id"` // unix nano seconds - // TODO: use timestamp allocated by TSO + // TODO: Use timestamp allocated by TSO. LastUpdateTS int64 `json:"last_update_ts"` }