*: Clean up (#1928)

* ddl: remove useless code, add and update comments
This commit is contained in:
zimulala
2016-11-03 11:40:14 +08:00
committed by GitHub
parent c74c4f80fd
commit b4e3e58693
9 changed files with 207 additions and 220 deletions

View File

@ -41,7 +41,7 @@ func (d *ddl) handleBgJobQueue() error {
return errors.Trace(err)
}
// get the first background job and run
// Get the first background job and run it.
job, err = d.getFirstBgJob(t)
if err != nil {
return errors.Trace(err)
@ -155,8 +155,8 @@ func (d *ddl) finishBgJob(t *meta.Meta, job *model.Job) error {
func (d *ddl) onBackgroundWorker() {
defer d.wait.Done()
// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time, if lease is 0, we will use default 10s.
// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
ticker := time.NewTicker(checkTime)

View File

@ -46,7 +46,7 @@ func (d *ddl) adjustColumnOffset(columns []*model.ColumnInfo, indices []*model.I
columns[offset].Offset = len(columns) - 1
}
// TODO: index can't cover the add/remove column with offset now, we may check this later.
// TODO: Index can't cover the add/remove column with offset now, we may check this later.
// Update index column offset info.
for _, idx := range indices {
@ -59,7 +59,7 @@ func (d *ddl) adjustColumnOffset(columns []*model.ColumnInfo, indices []*model.I
}
}
func (d *ddl) addColumn(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) {
func (d *ddl) createColumnInfo(tblInfo *model.TableInfo, colInfo *model.ColumnInfo, pos *ast.ColumnPosition) (*model.ColumnInfo, int, error) {
// Check column name duplicate.
cols := tblInfo.Columns
position := len(cols)
@ -111,17 +111,16 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
columnInfo := findCol(tblInfo.Columns, col.Name.L)
if columnInfo != nil {
if columnInfo.State == model.StatePublic {
// we already have a column with same column name
// We already have a column with the same column name.
job.State = model.JobCancelled
return infoschema.ErrColumnExists.Gen("ADD COLUMN: column already exist %s", col.Name.L)
}
} else {
columnInfo, offset, err = d.addColumn(tblInfo, col, pos)
columnInfo, offset, err = d.createColumnInfo(tblInfo, col, pos)
if err != nil {
job.State = model.JobCancelled
return errors.Trace(err)
}
// Set offset arg to job.
if offset != 0 {
job.Args = []interface{}{columnInfo, pos, offset}
@ -139,27 +138,24 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
job.SchemaState = model.StateDeleteOnly
columnInfo.State = model.StateDeleteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateDeleteOnly:
// delete only -> write only
job.SchemaState = model.StateWriteOnly
columnInfo.State = model.StateWriteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateWriteOnly:
// write only -> reorganization
job.SchemaState = model.StateWriteReorganization
columnInfo.State = model.StateWriteReorganization
// initialize SnapshotVer to 0 for later reorganization check.
// Initialize SnapshotVer to 0 for later reorganization check.
job.SnapshotVer = 0
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateWriteReorganization:
// reorganization -> public
// get the current version for reorganization if we don't have
// Get the current version for reorganization if we don't have it.
reorgInfo, err := d.getReorgInfo(t, job)
if err != nil || reorgInfo.first {
// if we run reorg firstly, we should update the job snapshot version
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return errors.Trace(err)
}
@ -173,7 +169,8 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
return d.addTableColumn(tbl, columnInfo, reorgInfo, job)
})
if terror.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
// If the timeout happens, we should return.
// Then check for the owner and re-wait job to finish.
return nil
}
if err != nil {
@ -183,21 +180,20 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
// Adjust column offset.
d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, offset, true)
columnInfo.State = model.StatePublic
if err = t.UpdateTable(schemaID, tblInfo); err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.SchemaState = model.StatePublic
job.State = model.JobDone
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State)
err = ErrInvalidColumnState.Gen("invalid column state %v", columnInfo.State)
}
return errors.Trace(err)
}
func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
@ -226,8 +222,8 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
colName, tblInfo.Name)
}
// we don't support drop column with index covered now.
// we must drop the index first, then drop the column.
// We don't support dropping column with index covered now.
// We must drop the index first, then drop the column.
for _, indexInfo := range tblInfo.Indices {
for _, col := range indexInfo.Columns {
if col.Name.L == colName.L {
@ -248,36 +244,31 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
// public -> write only
job.SchemaState = model.StateWriteOnly
colInfo.State = model.StateWriteOnly
// set this column's offset to the last and reset all following columns' offset
// Set this column's offset to the last and reset all following columns' offsets.
d.adjustColumnOffset(tblInfo.Columns, tblInfo.Indices, colInfo.Offset, false)
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
colInfo.State = model.StateDeleteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
colInfo.State = model.StateDeleteReorganization
// initialize SnapshotVer to 0 for later reorganization check.
// Initialize SnapshotVer to 0 for later reorganization check.
job.SnapshotVer = 0
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateDeleteReorganization:
// reorganization -> absent
reorgInfo, err := d.getReorgInfo(t, job)
if err != nil || reorgInfo.first {
// if we run reorg firstly, we should update the job snapshot version
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return errors.Trace(err)
}
// all reorganization jobs done, drop this column
// All reorganization jobs are done, drop this column.
newColumns := make([]*model.ColumnInfo, 0, len(tblInfo.Columns))
for _, col := range tblInfo.Columns {
if col.Name.L != colName.L {
@ -289,14 +280,14 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.SchemaState = model.StateNone
job.State = model.JobDone
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
return errors.Trace(err)
}
// How to backfill column data in reorganization state?

View File

@ -54,7 +54,7 @@ var (
errWaitReorgTimeout = terror.ClassDDL.New(codeWaitReorgTimeout, "wait for reorganization timeout")
errInvalidStoreVer = terror.ClassDDL.New(codeInvalidStoreVer, "invalid storage current version")
// we don't support drop column with index covered now.
// We don't support dropping column with index covered now.
errCantDropColWithIndex = terror.ClassDDL.New(codeCantDropColWithIndex, "can't drop column with index")
errUnsupportedAddColumn = terror.ClassDDL.New(codeUnsupportedAddColumn, "unsupported add column")
@ -120,16 +120,16 @@ type ddl struct {
hook Callback
hookMu sync.RWMutex
store kv.Storage
// schema lease seconds.
// Schema lease seconds.
lease time.Duration
uuid string
ddlJobCh chan struct{}
ddlJobDoneCh chan struct{}
// drop database/table job runs in the background.
// Drop database/table job that runs in the background.
bgJobCh chan struct{}
// reorgDoneCh is for reorganization, if the reorganization job is done,
// we will use this channel to notify outer.
// TODO: now we use goroutine to simulate reorganization jobs, later we may
// TODO: Now we use goroutine to simulate reorganization jobs, later we may
// use a persistent job list.
reorgDoneCh chan error
@ -181,7 +181,7 @@ func (d *ddl) Stop() error {
return nil
}
// ddl job's owner is me, clean it so other servers can compete for it quickly.
// DDL job's owner is me, clean it so other servers can complete it quickly.
return t.SetDDLJobOwner(&model.Owner{})
})
if err != nil {
@ -198,7 +198,7 @@ func (d *ddl) Stop() error {
return nil
}
// background job's owner is me, clean it so other servers can compete for it quickly.
// Background job's owner is me, clean it so other servers can complete it quickly.
return t.SetBgJobOwner(&model.Owner{})
})
@ -223,8 +223,8 @@ func (d *ddl) start() {
d.wait.Add(2)
go d.onBackgroundWorker()
go d.onDDLWorker()
// for every start, we will send a fake job to let worker
// check owner first and try to find whether a job exists and run.
// For every start, we will send a fake job to let worker
// check owner firstly and try to find whether a job exists and run.
asyncNotify(d.ddlJobCh)
asyncNotify(d.bgJobCh)
}
@ -259,12 +259,12 @@ func (d *ddl) SetLease(lease time.Duration) {
log.Warnf("[ddl] change schema lease %s -> %s", d.lease, lease)
if d.isClosed() {
// if already closed, just set lease and return
// If already closed, just set lease and return.
d.lease = lease
return
}
// close the running worker and start again
// Close the running worker and start again.
d.close()
d.lease = lease
d.start()
@ -361,7 +361,7 @@ func checkTooLongTable(table model.CIStr) error {
func getDefaultCharsetAndCollate() (string, string) {
// TODO: TableDefaultCharset-->DatabaseDefaultCharset-->SystemDefaultCharset.
// TODO: change TableOption parser to parse collate.
// TODO: Change TableOption parser to parse collate.
// This is a tmp solution.
return "utf8", "utf8_unicode_ci"
}
@ -372,7 +372,6 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra
for _, key := range v.Keys {
c, ok := colMap[key.Column.Name.L]
if !ok {
// TODO: table constraint on unknown column.
continue
}
c.Flag |= mysql.PriKeyFlag
@ -383,7 +382,6 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra
for i, key := range v.Keys {
c, ok := colMap[key.Column.Name.L]
if !ok {
// TODO: table constraint on unknown column.
continue
}
if i == 0 {
@ -402,7 +400,6 @@ func setColumnFlagWithConstraint(colMap map[string]*table.Column, v *ast.Constra
for i, key := range v.Keys {
c, ok := colMap[key.Column.Name.L]
if !ok {
// TODO: table constraint on unknown column.
continue
}
if i == 0 {
@ -427,7 +424,7 @@ func (d *ddl) buildColumnsAndConstraints(ctx context.Context, colDefs []*ast.Col
cols = append(cols, col)
colMap[colDef.Name.Name.L] = col
}
// traverse table Constraints and set col.flag
// Traverse table Constraints and set col.flag.
for _, v := range constraints {
setColumnFlagWithConstraint(colMap, v)
}
@ -878,7 +875,6 @@ func (d *ddl) CreateTable(ctx context.Context, ident ast.Ident, colDefs []*ast.C
Type: model.ActionCreateTable,
Args: []interface{}{tbInfo},
}
// Handle Table Options
d.handleTableOptions(options, tbInfo, schema.ID)
err = d.doDDLJob(ctx, job)
@ -927,7 +923,7 @@ func (d *ddl) handleTableOptions(options []*ast.TableOption, tbInfo *model.Table
}
func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.AlterTableSpec) (err error) {
// now we only allow one schema changes at the same time.
// Now we only allow one schema changing at the same time.
if len(specs) != 1 {
return errRunMultiSchemaChanges
}
@ -950,12 +946,12 @@ func (d *ddl) AlterTable(ctx context.Context, ident ast.Ident, specs []*ast.Alte
case ast.ConstraintForeignKey:
err = d.CreateForeignKey(ctx, ident, model.NewCIStr(constr.Name), spec.Constraint.Keys, spec.Constraint.Refer)
default:
// nothing to do now.
// Nothing to do now.
}
case ast.AlterTableDropForeignKey:
err = d.DropForeignKey(ctx, ident, model.NewCIStr(spec.Name))
default:
// nothing to do now.
// Nothing to do now.
}
if err != nil {
@ -1007,8 +1003,8 @@ func (d *ddl) AddColumn(ctx context.Context, ti ast.Ident, spec *ast.AlterTableS
return ErrTooLongIdent.Gen("too long column %s", colName)
}
// ingore table constraints now, maybe return error later
// we use length(t.Cols()) as the default offset first, later we will change the
// Ingore table constraints now, maybe return error later.
// We use length(t.Cols()) as the default offset firstly, later we will change the
// column's offset later.
col, _, err = d.buildColumnAndConstraint(ctx, len(t.Cols()), spec.Column)
if err != nil {
@ -1241,6 +1237,64 @@ func (d *ddl) DropIndex(ctx context.Context, ti ast.Ident, indexName model.CIStr
return errors.Trace(err)
}
func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
// For every DDL, we must commit current transaction.
if err := ctx.CommitTxn(); err != nil {
return errors.Trace(err)
}
// Get a global job ID and put the DDL job in the queue.
err := d.addDDLJob(ctx, job)
if err != nil {
return errors.Trace(err)
}
// Notice worker that we push a new job and wait the job done.
asyncNotify(d.ddlJobCh)
log.Warnf("[ddl] start DDL job %v", job)
var historyJob *model.Job
jobID := job.ID
// For a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// For every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
startTime := time.Now()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc()
defer func() {
ticker.Stop()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Dec()
retLabel := handleJobSucc
if err != nil {
retLabel = handleJobFailed
}
handleJobHistogram.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String(),
retLabel).Observe(time.Since(startTime).Seconds())
}()
for {
select {
case <-d.ddlJobDoneCh:
case <-ticker.C:
}
historyJob, err = d.getHistoryDDLJob(jobID)
if err != nil {
log.Errorf("[ddl] get history DDL job err %v, check again", err)
continue
} else if historyJob == nil {
log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID)
continue
}
// If a job is a history job, the state must be JobDone or JobCancel.
if historyJob.State == model.JobDone {
log.Warnf("[ddl] DDL job %d is finished", jobID)
return nil
}
return errors.Trace(historyJob.Error)
}
}
func (d *ddl) callHookOnChanged(err error) error {
d.hookMu.Lock()
defer d.hookMu.Unlock()

View File

@ -26,94 +26,34 @@ import (
"github.com/pingcap/tidb/terror"
)
func (d *ddl) doDDLJob(ctx context.Context, job *model.Job) error {
// for every DDL, we must commit current transaction.
if err := ctx.CommitTxn(); err != nil {
return errors.Trace(err)
}
var startTS uint64
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
job.ID, err = t.GenGlobalID()
startTS = txn.StartTS()
return errors.Trace(err)
})
if err != nil {
return errors.Trace(err)
}
ddlQuery, _ := ctx.Value(context.QueryString).(string)
job.Query = ddlQuery
// onDDLWorker is for async online schema changing, it will try to become the owner firstly,
// then wait or pull the job queue to handle a schema change job.
func (d *ddl) onDDLWorker() {
defer d.wait.Done()
// Create a new job and queue it.
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
err1 := t.EnQueueDDLJob(job)
return errors.Trace(err1)
})
if err != nil {
return errors.Trace(err)
}
// We use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time. If lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
// notice worker that we push a new job and wait the job done.
asyncNotify(d.ddlJobCh)
ticker := time.NewTicker(checkTime)
defer ticker.Stop()
log.Warnf("[ddl] start DDL job %v", job)
var historyJob *model.Job
jobID := job.ID
// for a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
// for every state changes, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
startTime := time.Now()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Inc()
defer func() {
ticker.Stop()
jobsGauge.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String()).Dec()
retLabel := handleJobSucc
if err != nil {
retLabel = handleJobFailed
}
handleJobHistogram.WithLabelValues(JobType(ddlJobFlag).String(), job.Type.String(),
retLabel).Observe(time.Since(startTime).Seconds())
}()
for {
select {
case <-d.ddlJobDoneCh:
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
case <-d.ddlJobCh:
case <-d.quitCh:
return
}
historyJob, err = d.getHistoryDDLJob(jobID)
err := d.handleDDLJobQueue()
if err != nil {
log.Errorf("[ddl] get history DDL job err %v, check again", err)
continue
} else if historyJob == nil {
log.Warnf("[ddl] DDL job %d is not in history, maybe not run", jobID)
continue
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
}
// if a job is a history table, the state must be JobDone or JobCancel.
if historyJob.State == model.JobDone {
return nil
}
return errors.Trace(historyJob.Error)
}
}
func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
})
return job, errors.Trace(err)
}
func asyncNotify(ch chan struct{}) {
select {
case ch <- struct{}{}:
@ -208,20 +148,41 @@ func (d *ddl) getJobOwner(t *meta.Meta, flag JobType) (*model.Owner, error) {
return owner, errors.Trace(err)
}
// addDDLJob gets a global job ID and puts the DDL job in the DDL queue.
func (d *ddl) addDDLJob(ctx context.Context, job *model.Job) error {
job.Query, _ = ctx.Value(context.QueryString).(string)
return kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
job.ID, err = t.GenGlobalID()
if err != nil {
return errors.Trace(err)
}
err = t.EnQueueDDLJob(job)
return errors.Trace(err)
})
}
// getFirstDDLJob gets the first DDL job form DDL queue.
func (d *ddl) getFirstDDLJob(t *meta.Meta) (*model.Job, error) {
job, err := t.GetDDLJob(0)
return job, errors.Trace(err)
}
// every time we enter another state except final state, we must call this function.
// updateDDLJob updates the DDL job information.
// Every time we enter another state except final state, we must call this function.
func (d *ddl) updateDDLJob(t *meta.Meta, job *model.Job) error {
err := t.UpdateDDLJob(0, job)
return errors.Trace(err)
}
// finishDDLJob deletes the finished DDL job in the ddl queue and puts it to history queue.
// If the DDL job need to handle in background, it will prepare a background job.
func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error {
log.Warnf("[ddl] finish DDL job %v", job)
// done, notice and run next job.
log.Infof("[ddl] finish DDL job %v", job)
// Job is finished, notice and run the next job.
_, err := t.DeQueueDDLJob()
if err != nil {
return errors.Trace(err)
@ -237,6 +198,20 @@ func (d *ddl) finishDDLJob(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}
// getHistoryDDLJob gets a DDL job with job's ID form history queue.
func (d *ddl) getHistoryDDLJob(id int64) (*model.Job, error) {
var job *model.Job
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err1 error
job, err1 = t.GetHistoryDDLJob(id)
return errors.Trace(err1)
})
return job, errors.Trace(err)
}
// JobType is job type, including ddl/background.
type JobType int
@ -268,24 +243,23 @@ func (d *ddl) handleDDLJobQueue() error {
t := meta.NewMeta(txn)
owner, err := d.checkOwner(t, ddlJobFlag)
if terror.ErrorEqual(err, errNotOwner) {
// we are not owner, return and retry checking later.
// We are not owner, return and retry checking later.
return nil
} else if err != nil {
return errors.Trace(err)
}
// become the owner
// get the first job and run
// We become the owner. Get the first job and run it.
job, err = d.getFirstDDLJob(t)
if job == nil || err != nil {
return errors.Trace(err)
}
if job.IsRunning() {
// if we enter a new state, crash when waiting 2 * lease time, and restart quickly,
// If we enter a new state, crash when waiting 2 * lease time, and restart quickly,
// we may run the job immediately again, but we don't wait enough 2 * lease time to
// let other servers update the schema.
// so here we must check the elapsed time from last update, if < 2 * lease, we must
// So here we must check the elapsed time from last update, if < 2 * lease, we must
// wait again.
elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS)
if elapsed > 0 && elapsed < waitTime {
@ -295,16 +269,13 @@ func (d *ddl) handleDDLJobQueue() error {
}
}
log.Warnf("[ddl] run DDL job %v", job)
d.hookMu.Lock()
d.hook.OnJobRunBefore(job)
d.hookMu.Unlock()
// if run job meets error, we will save this error in job Error
// If running job meets error, we will save this error in job Error
// and retry later if the job is not cancelled.
d.runDDLJob(t, job)
if job.IsFinished() {
binloginfo.SetDDLBinlog(txn, job.ID, job.Query)
err = d.finishDDLJob(t, job)
@ -315,17 +286,16 @@ func (d *ddl) handleDDLJobQueue() error {
return errors.Trace(err)
}
// running job may cost some time, so here we must update owner status to
// Running job may cost some time, so here we must update owner status to
// prevent other become the owner.
owner.LastUpdateTS = time.Now().UnixNano()
err = t.SetDDLJobOwner(owner)
return errors.Trace(err)
})
if err != nil {
return errors.Trace(err)
} else if job == nil {
// no job now, return and retry get later.
// No job now, return and retry getting later.
return nil
}
@ -333,13 +303,12 @@ func (d *ddl) handleDDLJobQueue() error {
d.hook.OnJobUpdated(job)
d.hookMu.Unlock()
// here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// Here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// If the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.
if job.State == model.JobRunning || job.State == model.JobDone {
d.waitSchemaChanged(waitTime)
}
if job.IsFinished() {
d.startBgJob(job.Type)
asyncNotify(d.ddlJobDoneCh)
@ -355,35 +324,9 @@ func chooseLeaseTime(n1 time.Duration, n2 time.Duration) time.Duration {
return n2
}
// onDDLWorker is for async online schema change, it will try to become the owner first,
// then wait or pull the job queue to handle a schema change job.
func (d *ddl) onDDLWorker() {
defer d.wait.Done()
// we use 4 * lease time to check owner's timeout, so here, we will update owner's status
// every 2 * lease time, if lease is 0, we will use default 10s.
checkTime := chooseLeaseTime(2*d.lease, 10*time.Second)
ticker := time.NewTicker(checkTime)
defer ticker.Stop()
for {
select {
case <-ticker.C:
log.Debugf("[ddl] wait %s to check DDL status again", checkTime)
case <-d.ddlJobCh:
case <-d.quitCh:
return
}
err := d.handleDDLJobQueue()
if err != nil {
log.Errorf("[ddl] handle ddl job err %v", errors.ErrorStack(err))
}
}
}
// runDDLJob runs a DDL job.
func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) {
log.Warnf("[ddl] run DDL job %v", job)
if job.IsFinished() {
return
}
@ -417,14 +360,14 @@ func (d *ddl) runDDLJob(t *meta.Meta, job *model.Job) {
case model.ActionTruncateTable:
err = d.onTruncateTable(t, job)
default:
// invalid job, cancel it.
// Invalid job, cancel it.
job.State = model.JobCancelled
err = errInvalidDDLJob.Gen("invalid ddl job %v", job)
}
// saves error in job, so that others can know error happens.
// Save errors in job, so that others can know errors happened.
if err != nil {
// if job is not cancelled, we should log this error.
// If job is not cancelled, we should log this error.
if job.State != model.JobCancelled {
log.Errorf("[ddl] run ddl job err %v", errors.ErrorStack(err))
}
@ -441,11 +384,11 @@ func toTError(err error) *terror.Error {
return tErr
}
// TODO: add the error code
// TODO: Add the error code.
return terror.ClassDDL.New(terror.CodeUnknown, err.Error())
}
// for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time
// For every lease, we will re-update the whole schema, so we will wait 2 * lease time
// to guarantee that all servers have already updated schema.
func (d *ddl) waitSchemaChanged(waitTime time.Duration) {
if waitTime == 0 {

View File

@ -50,7 +50,7 @@ func (d *ddl) onCreateForeignKey(t *meta.Meta, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.State = model.JobDone
addTableHistoryInfo(job, ver, tblInfo)
return nil
@ -112,7 +112,7 @@ func (d *ddl) onDropForeignKey(t *meta.Meta, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.State = model.JobDone
addTableHistoryInfo(job, ver, tblInfo)
return nil

View File

@ -111,7 +111,7 @@ func dropIndexColumnFlag(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) {
}
func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
// rollback job
// Handle rollback job.
if job.State == model.JobRollback {
err := d.onDropIndex(t, job)
if err != nil {
@ -120,7 +120,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
return nil
}
// normal job
// Handle normal job.
schemaID := job.SchemaID
tblInfo, err := d.getTableInfo(t, job)
if err != nil {
@ -143,11 +143,10 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
for _, idx := range tblInfo.Indices {
if idx.Name.L == indexName.L {
if idx.State == model.StatePublic {
// we already have a index with same index name
// We already have an index with same index name.
job.State = model.JobCancelled
return infoschema.ErrIndexExists.Gen("CREATE INDEX: index already exist %s", indexName)
}
indexInfo = idx
break
}
@ -184,7 +183,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
// write only -> reorganization
job.SchemaState = model.StateWriteReorganization
indexInfo.State = model.StateWriteReorganization
// initialize SnapshotVer to 0 for later reorganization check.
// Initialize SnapshotVer to 0 for later reorganization check.
job.SnapshotVer = 0
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
@ -192,7 +191,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
// reorganization -> public
reorgInfo, err := d.getReorgInfo(t, job)
if err != nil || reorgInfo.first {
// if we run reorg firstly, we should update the job snapshot version
// If we run reorg firstly, we should update the job snapshot version
// and then run the reorg next time.
return errors.Trace(err)
}
@ -206,7 +205,6 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
err = d.runReorgJob(func() error {
return d.addTableIndex(tbl, indexInfo, reorgInfo, job)
})
if terror.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
return nil
@ -220,13 +218,13 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) error {
}
indexInfo.State = model.StatePublic
// set column index flag.
// Set column index flag.
addIndexColumnFlag(tblInfo, indexInfo)
if err = t.UpdateTable(schemaID, tblInfo); err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.SchemaState = model.StatePublic
job.State = model.JobDone
addTableHistoryInfo(job, ver, tblInfo)
@ -290,34 +288,31 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
job.SchemaState = model.StateWriteOnly
indexInfo.State = model.StateWriteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
indexInfo.State = model.StateDeleteOnly
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
indexInfo.State = model.StateDeleteReorganization
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateDeleteReorganization:
// reorganization -> absent
err = d.runReorgJob(func() error {
return d.dropTableIndex(indexInfo, job)
})
if terror.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
// If the timeout happens, we should return.
// Then check for the owner and re-wait job to finish.
return nil
}
if err != nil {
return errors.Trace(err)
}
// all reorganization jobs done, drop this index
// All reorganization jobs are done, drop this index.
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
for _, idx := range tblInfo.Indices {
if idx.Name.L != indexName.L {
@ -325,13 +320,13 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
}
}
tblInfo.Indices = newIndices
// set column index flag.
// Set column index flag.
dropIndexColumnFlag(tblInfo, indexInfo)
if err = t.UpdateTable(schemaID, tblInfo); err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.SchemaState = model.StateNone
if job.State == model.JobRollback {
job.State = model.JobRollbackDone
@ -339,10 +334,10 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
job.State = model.JobDone
}
addTableHistoryInfo(job, ver, tblInfo)
return nil
default:
return ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
err = ErrInvalidTableState.Gen("invalid table state %v", tblInfo.State)
}
return errors.Trace(err)
}
func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) (

View File

@ -26,7 +26,7 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
schemaID := job.SchemaID
dbInfo := &model.DBInfo{}
if err := job.DecodeArgs(dbInfo); err != nil {
// arg error, cancel this job.
// Invalid arguments, cancel this job.
job.State = model.JobCancelled
return errors.Trace(err)
}
@ -42,7 +42,7 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
for _, db := range dbs {
if db.Name.L == dbInfo.Name.L {
if db.ID != schemaID {
// database exists, can't create, we should cancel this job now.
// The database already exists, can't create it, we should cancel this job now.
job.State = model.JobCancelled
return errors.Trace(infoschema.ErrDatabaseExists)
}
@ -64,12 +64,12 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.State = model.JobDone
addDBHistoryInfo(job, ver, dbInfo)
return nil
default:
// we can't enter here.
// We can't enter here.
return errors.Errorf("invalid db state %v", dbInfo.State)
}
}
@ -112,7 +112,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
break
}
// finish this job
// Finish this job.
addDBHistoryInfo(job, ver, dbInfo)
if len(tables) > 0 {
job.Args = append(job.Args, getIDs(tables))
@ -120,7 +120,7 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
job.State = model.JobDone
job.SchemaState = model.StateNone
default:
// we can't enter here.
// We can't enter here.
err = errors.Errorf("invalid db state %v", dbInfo.State)
}
@ -152,7 +152,7 @@ func (d *ddl) delReorgSchema(t *meta.Meta, job *model.Job) error {
return nil
}
// finish this background job
// Finish this background job.
job.SchemaState = model.StateNone
job.State = model.JobDone

View File

@ -29,13 +29,14 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error {
schemaID := job.SchemaID
tbInfo := &model.TableInfo{}
if err := job.DecodeArgs(tbInfo); err != nil {
// arg error, cancel this job.
// Invalid arguments, cancel this job.
job.State = model.JobCancelled
return errors.Trace(err)
}
tbInfo.State = model.StateNone
// Check this table's database.
tables, err := t.ListTables(schemaID)
if terror.ErrorEqual(err, meta.ErrDBNotExists) {
job.State = model.JobCancelled
@ -44,10 +45,11 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}
// Check the table.
for _, tbl := range tables {
if tbl.Name.L == tbInfo.Name.L {
if tbl.ID != tbInfo.ID {
// table exists, can't create, we should cancel this job now.
// This table already exists, can't create it, we should cancel this job now.
job.State = model.JobCancelled
return errors.Trace(infoschema.ErrTableExists)
}
@ -70,7 +72,7 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
// finish this job
// Finish this job.
job.State = model.JobDone
addTableHistoryInfo(job, ver, tbInfo)
return nil
@ -83,6 +85,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error {
schemaID := job.SchemaID
tableID := job.TableID
// Check this table's database.
tblInfo, err := t.GetTable(schemaID, tableID)
if terror.ErrorEqual(err, meta.ErrDBNotExists) {
job.State = model.JobCancelled
@ -91,6 +94,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}
// Check the table.
if tblInfo == nil {
job.State = model.JobCancelled
return errors.Trace(infoschema.ErrTableNotExists)
@ -118,7 +122,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error {
if err = t.DropTable(job.SchemaID, job.TableID); err != nil {
break
}
// finish this job
// Finish this job.
job.State = model.JobDone
job.SchemaState = model.StateNone
addTableHistoryInfo(job, ver, tblInfo)
@ -146,7 +150,7 @@ func (d *ddl) delReorgTable(t *meta.Meta, job *model.Job) error {
if err != nil {
return errors.Trace(err)
}
// finish this background job
// Finish this background job.
if delCount < limit {
job.SchemaState = model.StateNone
job.State = model.JobDone

View File

@ -78,19 +78,19 @@ type Job struct {
TableID int64 `json:"table_id"`
State JobState `json:"state"`
Error *terror.Error `json:"err"`
// every time we meet an error when running job, we will increase it.
// Every time we meet an error when running job, we will increase it.
ErrorCount int64 `json:"err_count"`
// the number of rows are processed.
// The number of rows that are processed.
RowCount int64 `json:"row_count"`
Mu sync.Mutex `json:"-"`
Args []interface{} `json:"-"`
// we must use json raw message for delay parsing special args.
// We must use json raw message to delay parsing special args.
RawArgs json.RawMessage `json:"raw_args"`
SchemaState SchemaState `json:"schema_state"`
// snapshot version for this job.
// Snapshot version for this job.
SnapshotVer uint64 `json:"snapshot_ver"`
// unix nano seconds
// TODO: use timestamp allocated by TSO.
// TODO: Use timestamp allocated by TSO.
LastUpdateTS int64 `json:"last_update_ts"`
// Query string of the ddl job.
Query string `json:"query"`
@ -203,7 +203,7 @@ func (s JobState) String() string {
type Owner struct {
OwnerID string `json:"owner_id"`
// unix nano seconds
// TODO: use timestamp allocated by TSO
// TODO: Use timestamp allocated by TSO.
LastUpdateTS int64 `json:"last_update_ts"`
}