From 9c6b27877d0cda18ba28f2face8f8650fa84621e Mon Sep 17 00:00:00 2001 From: siddontang Date: Wed, 18 Nov 2015 09:53:14 +0800 Subject: [PATCH] *: must wait 2 * lease schema lease time when running job. --- ddl/stat.go | 2 +- ddl/worker.go | 32 +++++++++++++++++++++++++------- meta/meta.go | 2 +- model/ddl.go | 12 ++++++++---- 4 files changed, 35 insertions(+), 13 deletions(-) diff --git a/ddl/stat.go b/ddl/stat.go index 619ddd7619..bbba801333 100644 --- a/ddl/stat.go +++ b/ddl/stat.go @@ -72,7 +72,7 @@ func (d *ddl) Stat() (map[string]interface{}, error) { if job != nil { m["ddl_job_id"] = job.ID m["ddl_job_action"] = job.Type.String() - m["ddl_job_last_update_ts"] = job.LastUpdateTS + m["ddl_job_last_update_ts"] = job.LastUpdateTS / 1e9 m["ddl_job_state"] = job.State.String() m["ddl_job_error"] = job.Error m["ddl_job_schema_state"] = job.SchemaState.String() diff --git a/ddl/worker.go b/ddl/worker.go index 2224ece157..7418a7fa00 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -175,6 +175,8 @@ func (d *ddl) handleJobQueue() error { return nil } + waitTime := 2 * d.lease + var job *model.Job err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error { t := meta.NewMeta(txn) @@ -193,6 +195,20 @@ func (d *ddl) handleJobQueue() error { return errors.Trace(err) } + if job.State == model.JobRunning { + // if we enter a new state, crash when waiting 2 * lease time, and restart quickly, + // we may run the job immediately again, but we don't wait enough 2 * lease time to + // let other servers update the schema. + // so here we must check the elapsed time from last update, if < 2 * lease, we must + // wait again. + elapsed := time.Duration(time.Now().UnixNano() - job.LastUpdateTS) + if elapsed > 0 && elapsed < waitTime { + log.Warnf("the elapsed time from last update is %s < %s, wait again", elapsed, waitTime) + waitTime -= elapsed + return nil + } + } + log.Warnf("run DDL job %v", job) d.hook.OnJobRunBefore(job) @@ -236,7 +252,7 @@ func (d *ddl) handleJobQueue() error { // if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update // the newest schema. if job.State == model.JobRunning || job.State == model.JobDone { - d.waitSchemaChanged() + d.waitSchemaChanged(waitTime) } } } @@ -322,11 +338,13 @@ func (d *ddl) runJob(t *meta.Meta, job *model.Job) { // for every lease seconds, we will re-update the whole schema, so we will wait 2 * lease time // to guarantee that all servers have already updated schema. -func (d *ddl) waitSchemaChanged() { - if d.lease > 0 { - select { - case <-time.After(d.lease * 2): - case <-d.quitCh: - } +func (d *ddl) waitSchemaChanged(waitTime time.Duration) { + if waitTime == 0 { + return + } + + select { + case <-time.After(waitTime): + case <-d.quitCh: } } diff --git a/meta/meta.go b/meta/meta.go index 2ef46af2d3..6536ff5d7f 100644 --- a/meta/meta.go +++ b/meta/meta.go @@ -479,7 +479,7 @@ func (m *Meta) GetDDLJob(index int64) (*model.Job, error) { // UpdateDDLJob updates the DDL job with index. func (m *Meta) UpdateDDLJob(index int64, job *model.Job) error { // TODO: use timestamp allocated by TSO - job.LastUpdateTS = time.Now().Unix() + job.LastUpdateTS = time.Now().UnixNano() b, err := job.Encode() if err != nil { return errors.Trace(err) diff --git a/model/ddl.go b/model/ddl.go index 4f35b2c8fc..d1c2e4d9f2 100644 --- a/model/ddl.go +++ b/model/ddl.go @@ -74,8 +74,10 @@ type Job struct { RawArgs json.RawMessage `json:"raw_args"` SchemaState SchemaState `json:"schema_state"` // snapshot version for this job. - SnapshotVer uint64 `json:"snapshot_ver"` - LastUpdateTS int64 `json:"last_update_ts"` + SnapshotVer uint64 `json:"snapshot_ver"` + // unix nano seconds + // TODO: use timestamp allocated by TSO + LastUpdateTS int64 `json:"last_update_ts"` } // Encode encodes job with json format. @@ -138,6 +140,8 @@ func (s JobState) String() string { // Owner is for DDL Owner. type Owner struct { - OwnerID string `json:"owner_id"` - LastUpdateTS int64 `json:"last_update_ts"` + OwnerID string `json:"owner_id"` + // unix nano seconds + // TODO: use timestamp allocated by TSO + LastUpdateTS int64 `json:"last_update_ts"` }