From 8ecaf67d9a7ef0049feeee8a59a12a75fa54b508 Mon Sep 17 00:00:00 2001 From: xia Date: Wed, 6 Jan 2016 12:42:03 +0800 Subject: [PATCH] ddl: add tasks --- ddl/schema.go | 37 ++++++++++++++++++++++++++++++++----- ddl/worker.go | 5 ++--- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/ddl/schema.go b/ddl/schema.go index 1bab5c7316..4780b8532e 100644 --- a/ddl/schema.go +++ b/ddl/schema.go @@ -74,6 +74,35 @@ func (d *ddl) onCreateSchema(t *meta.Meta, job *model.Job) error { } } +func (d *ddl) delReorgSchema(t *meta.Meta, task *model.Job) error { + // wait reorganization jobs done and drop meta. + tables, err := t.ListTables(dbInfo.ID) + if err != nil { + return errors.Trace(err) + } + + err = d.runReorgJob(func() error { + return d.dropSchemaData(dbInfo, tables) + }) + + if terror.ErrorEqual(err, errWaitReorgTimeout) { + // if timeout, we should return, check for the owner and re-wait job done. + return nil + } + if err != nil { + return errors.Trace(err) + } + + // all reorganization jobs done, drop this database + if err = t.DropDatabase(dbInfo.ID); err != nil { + return errors.Trace(err) + } + + // finish this job + job.State = model.JobDone + job.SchemaState = model.StateNone +} + func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { dbInfo, err := t.GetDatabase(job.SchemaID) if err != nil { @@ -95,19 +124,16 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { job.SchemaState = model.StateWriteOnly dbInfo.State = model.StateWriteOnly err = t.UpdateDatabase(dbInfo) - return errors.Trace(err) case model.StateWriteOnly: // write only -> delete only job.SchemaState = model.StateDeleteOnly dbInfo.State = model.StateDeleteOnly err = t.UpdateDatabase(dbInfo) - return errors.Trace(err) case model.StateDeleteOnly: // delete only -> reorganization job.SchemaState = model.StateDeleteReorganization dbInfo.State = model.StateDeleteReorganization err = t.UpdateDatabase(dbInfo) - return errors.Trace(err) case model.StateDeleteReorganization: // wait reorganization jobs done and drop meta. var tables []*model.TableInfo @@ -136,11 +162,12 @@ func (d *ddl) onDropSchema(t *meta.Meta, job *model.Job) error { // finish this job job.State = model.JobDone job.SchemaState = model.StateNone - return nil default: // we can't enter here. - return errors.Errorf("invalid db state %v", dbInfo.State) + err = errors.Errorf("invalid db state %v", dbInfo.State) } + + return errors.Trace(err) } func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) error { diff --git a/ddl/worker.go b/ddl/worker.go index d914f68a88..18a4240f65 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -230,9 +230,8 @@ func (d *ddl) handleJobQueue() error { // running job may cost some time, so here we must update owner status to // prevent other become the owner. owner.LastUpdateTS = time.Now().UnixNano() - if err = t.SetDDLOwner(owner); err != nil { - return errors.Trace(err) - } + err = t.SetDDLOwner(owner) + return errors.Trace(err) })