ddl: add tasks

This commit is contained in:
xia
2016-01-06 12:42:03 +08:00
parent dcb5be892f
commit 8ecaf67d9a
2 changed files with 34 additions and 8 deletions

View File

@ -74,6 +74,35 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error {
}
}
func (d *ddl) delReorgSchema(t *meta.Meta, task *model.Job) error {
// wait reorganization jobs done and drop meta.
tables, err := t.ListTables(dbInfo.ID)
if err != nil {
return errors.Trace(err)
}
err = d.runReorgJob(func() error {
return d.dropSchemaData(dbInfo, tables)
})
if terror.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
return nil
}
if err != nil {
return errors.Trace(err)
}
// all reorganization jobs done, drop this database
if err = t.DropDatabase(dbInfo.ID); err != nil {
return errors.Trace(err)
}
// finish this job
job.State = model.JobDone
job.SchemaState = model.StateNone
}
func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
dbInfo, err := t.GetDatabase(job.SchemaID)
if err != nil {
@ -95,19 +124,16 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
job.SchemaState = model.StateWriteOnly
dbInfo.State = model.StateWriteOnly
err = t.UpdateDatabase(dbInfo)
return errors.Trace(err)
case model.StateWriteOnly:
// write only -> delete only
job.SchemaState = model.StateDeleteOnly
dbInfo.State = model.StateDeleteOnly
err = t.UpdateDatabase(dbInfo)
return errors.Trace(err)
case model.StateDeleteOnly:
// delete only -> reorganization
job.SchemaState = model.StateDeleteReorganization
dbInfo.State = model.StateDeleteReorganization
err = t.UpdateDatabase(dbInfo)
return errors.Trace(err)
case model.StateDeleteReorganization:
// wait reorganization jobs done and drop meta.
var tables []*model.TableInfo
@ -136,11 +162,12 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error {
// finish this job
job.State = model.JobDone
job.SchemaState = model.StateNone
return nil
default:
// we can't enter here.
return errors.Errorf("invalid db state %v", dbInfo.State)
err = errors.Errorf("invalid db state %v", dbInfo.State)
}
return errors.Trace(err)
}
func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) error {

View File

@ -230,9 +230,8 @@ func (d *ddl) handleJobQueue() error {
// running job may cost some time, so here we must update owner status to
// prevent other become the owner.
owner.LastUpdateTS = time.Now().UnixNano()
if err = t.SetDDLOwner(owner); err != nil {
return errors.Trace(err)
}
err = t.SetDDLOwner(owner)
return errors.Trace(err)
})