ddl: close should wait reorg coroutine returns too.
This commit is contained in:
@ -335,7 +335,7 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha
|
||||
log.Info("backfill column...", handle)
|
||||
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
if err := d.isReorgRunnable(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -405,7 +405,7 @@ func (d *ddl) dropTableColumn(t table.Table, colInfo *model.ColumnInfo, reorgInf
|
||||
seekHandle = handles[len(handles)-1] + 1
|
||||
|
||||
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
if err := d.isReorgRunnable(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
|
||||
@ -421,7 +421,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
|
||||
log.Debug("building index...", handle)
|
||||
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
if err := d.isReorgRunnable(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
|
||||
11
ddl/reorg.go
11
ddl/reorg.go
@ -102,8 +102,10 @@ var errWaitReorgTimeout = errors.New("wait for reorganization timeout")
|
||||
func (d *ddl) runReorgJob(f func() error) error {
|
||||
if d.reorgDoneCh == nil {
|
||||
// start a reorganization job
|
||||
d.wait.Add(1)
|
||||
d.reorgDoneCh = make(chan error, 1)
|
||||
go func() {
|
||||
defer d.wait.Done()
|
||||
d.reorgDoneCh <- f()
|
||||
}()
|
||||
}
|
||||
@ -132,7 +134,12 @@ func (d *ddl) runReorgJob(f func() error) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) isOwnerInReorg(txn kv.Transaction) error {
|
||||
func (d *ddl) isReorgRunnable(txn kv.Transaction) error {
|
||||
if d.isClosed() {
|
||||
// worker is closed, can't run reorganization.
|
||||
return errors.Trace(ErrWorkerClosed)
|
||||
}
|
||||
|
||||
t := meta.NewMeta(txn)
|
||||
owner, err := t.GetDDLOwner()
|
||||
if err != nil {
|
||||
@ -152,7 +159,7 @@ func (d *ddl) delKeysWithPrefix(prefix string) error {
|
||||
for {
|
||||
keys := keys[0:0]
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
if err := d.isReorgRunnable(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
|
||||
@ -166,6 +166,9 @@ func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error {
|
||||
// ErrNotOwner means we are not owner and can't handle DDL jobs.
|
||||
var ErrNotOwner = errors.New("DDL: not owner")
|
||||
|
||||
// ErrWorkerClosed means we have already closed the DDL worker.
|
||||
var ErrWorkerClosed = errors.New("DDL: worker is closed")
|
||||
|
||||
func (d *ddl) handleJobQueue() error {
|
||||
for {
|
||||
if d.isClosed() {
|
||||
|
||||
Reference in New Issue
Block a user