From a724feaaa96a095e26dca807db38676bf686e01a Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 17 Nov 2015 11:57:44 +0800 Subject: [PATCH] ddl: close should wait reorg coroutine returns too. --- ddl/column.go | 4 ++-- ddl/index.go | 2 +- ddl/reorg.go | 11 +++++++++-- ddl/worker.go | 3 +++ 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/ddl/column.go b/ddl/column.go index ee96afb0db..6554cbfe60 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -335,7 +335,7 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha log.Info("backfill column...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - if err := d.isOwnerInReorg(txn); err != nil { + if err := d.isReorgRunnable(txn); err != nil { return errors.Trace(err) } @@ -405,7 +405,7 @@ func (d *ddl) dropTableColumn(t table.Table, colInfo *model.ColumnInfo, reorgInf seekHandle = handles[len(handles)-1] + 1 err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - if err := d.isOwnerInReorg(txn); err != nil { + if err := d.isReorgRunnable(txn); err != nil { return errors.Trace(err) } diff --git a/ddl/index.go b/ddl/index.go index f131e706ba..c5f4b1b529 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -421,7 +421,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand log.Debug("building index...", handle) err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - if err := d.isOwnerInReorg(txn); err != nil { + if err := d.isReorgRunnable(txn); err != nil { return errors.Trace(err) } diff --git a/ddl/reorg.go b/ddl/reorg.go index bf6ecac74d..3e80bd3238 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -102,8 +102,10 @@ var errWaitReorgTimeout = errors.New("wait for reorganization timeout") func (d *ddl) runReorgJob(f func() error) error { if d.reorgDoneCh == nil { // start a reorganization job + d.wait.Add(1) d.reorgDoneCh = make(chan error, 1) go func() { + defer d.wait.Done() d.reorgDoneCh <- f() }() } @@ -132,7 +134,12 @@ func (d *ddl) runReorgJob(f func() error) error { } } -func (d *ddl) isOwnerInReorg(txn kv.Transaction) error { +func (d *ddl) isReorgRunnable(txn kv.Transaction) error { + if d.isClosed() { + // worker is closed, can't run reorganization. + return errors.Trace(ErrWorkerClosed) + } + t := meta.NewMeta(txn) owner, err := t.GetDDLOwner() if err != nil { @@ -152,7 +159,7 @@ func (d *ddl) delKeysWithPrefix(prefix string) error { for { keys := keys[0:0] err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - if err := d.isOwnerInReorg(txn); err != nil { + if err := d.isReorgRunnable(txn); err != nil { return errors.Trace(err) } diff --git a/ddl/worker.go b/ddl/worker.go index 6736b63c20..2224ece157 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -166,6 +166,9 @@ func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error { // ErrNotOwner means we are not owner and can't handle DDL jobs. var ErrNotOwner = errors.New("DDL: not owner") +// ErrWorkerClosed means we have already closed the DDL worker. +var ErrWorkerClosed = errors.New("DDL: worker is closed") + func (d *ddl) handleJobQueue() error { for { if d.isClosed() {