diff --git a/ddl/backfilling.go b/ddl/backfilling.go index b133ac12b1..362a372aa7 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -146,6 +146,7 @@ type backfillTaskContext struct { type backfillWorker struct { id int ddlWorker *worker + reorgInfo *reorgInfo batchCnt int sessCtx sessionctx.Context taskCh chan *reorgBackfillTask @@ -155,11 +156,12 @@ type backfillWorker struct { priority int } -func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable) *backfillWorker { +func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, reorgInfo *reorgInfo) *backfillWorker { return &backfillWorker{ id: id, table: t, ddlWorker: worker, + reorgInfo: reorgInfo, batchCnt: int(variable.GetDDLReorgBatchSize()), sessCtx: sessCtx, taskCh: make(chan *reorgBackfillTask, 1), @@ -234,13 +236,14 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, lastLogCount := 0 lastLogTime := time.Now() startTime := lastLogTime + rc := d.getReorgCtx(w.reorgInfo.Job) for { // Give job chance to be canceled, if we not check it here, // if there is panic in bf.BackfillDataInTxn we will never cancel the job. // Because reorgRecordTask may run a long time, // we should check whether this ddl job is still runnable. - err := w.ddlWorker.isReorgRunnable(d) + err := w.ddlWorker.isReorgRunnable(w.reorgInfo.Job) if err != nil { result.err = err return result @@ -263,8 +266,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask, // small ranges. This will cause the `redo` action in reorganization. // So for added count and warnings collection, it is recommended to collect the statistics in every // successfully committed small ranges rather than fetching it in the total result. - w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount)) - w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount) + rc.increaseRowCount(int64(taskCtx.addedCount)) + rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount) if num := result.scanCount - lastLogCount; num >= 30000 { lastLogCount = result.scanCount @@ -399,7 +402,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey) elapsedTime := time.Since(startTime) if err == nil { - err = w.isReorgRunnable(reorgInfo.d) + err = w.isReorgRunnable(reorgInfo.Job) } if err != nil { @@ -420,7 +423,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64, } // nextHandle will be updated periodically in runReorgJob, so no need to update it here. - w.reorgCtx.setNextKey(nextKey) + w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey) metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds()) logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch", zap.ByteString("elementType", reorgInfo.currElement.TypeKey), @@ -583,7 +586,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba return errors.Trace(err) } - if err := w.isReorgRunnable(reorgInfo.d); err != nil { + if err := w.isReorgRunnable(reorgInfo.Job); err != nil { return errors.Trace(err) } if startKey == nil && endKey == nil { @@ -642,19 +645,19 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba switch bfWorkerType { case typeAddIndexWorker: - idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode) + idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo) idxWorker.priority = job.Priority backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker) go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job) case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true - updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode) + updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo) updateWorker.priority = job.Priority backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker) go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job) case typeCleanUpIndexWorker: - idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo.ReorgMeta.SQLMode) + idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo) idxWorker.priority = job.Priority backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker) go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job) diff --git a/ddl/column.go b/ddl/column.go index 6e0c166a12..e973e944f0 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -1060,8 +1060,6 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J return false, ver, nil } if kv.IsTxnRetryableError(err) { - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() return false, ver, errors.Trace(err) } if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil { @@ -1070,12 +1068,8 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J } logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err)) job.State = model.JobStateRollingback - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() return false, ver, errors.Trace(err) } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() return true, ver, nil } @@ -1235,7 +1229,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column TestReorgGoroutineRunning <- a for { time.Sleep(30 * time.Millisecond) - if w.reorgCtx.isReorgCanceled() { + if w.getReorgCtx(reorgInfo.Job).isReorgCanceled() { // Job is cancelled. So it can't be done. failpoint.Return(dbterror.ErrCancelledDDLJob) } @@ -1281,7 +1275,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column } // Update the element in the reorgCtx to keep the atomic access for daemon-worker. - w.reorgCtx.setCurrentElement(reorgInfo.elements[i+1]) + w.getReorgCtx(reorgInfo.Job).setCurrentElement(reorgInfo.elements[i+1]) // Update the element in the reorgInfo for updating the reorg meta below. reorgInfo.currElement = reorgInfo.elements[i+1] @@ -1320,16 +1314,16 @@ type updateColumnWorker struct { sqlMode mysql.SQLMode } -func newUpdateColumnWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *updateColumnWorker { +func newUpdateColumnWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo) *updateColumnWorker { rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &updateColumnWorker{ - backfillWorker: newBackfillWorker(sessCtx, worker, id, t), + backfillWorker: newBackfillWorker(sessCtx, worker, id, t, reorgInfo), oldColInfo: oldCol, newColInfo: newCol, metricCounter: metrics.BackfillTotalCounter.WithLabelValues("update_col_rate"), rowDecoder: rowDecoder, rowMap: make(map[int64]types.Datum, len(decodeColMap)), - sqlMode: sqlMode, + sqlMode: reorgInfo.ReorgMeta.SQLMode, } } diff --git a/ddl/ddl.go b/ddl/ddl.go index c724068080..a41c456b08 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/parser/ast" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" + "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -222,6 +223,13 @@ type ddlCtx struct { tableLockCkr util.DeadTableLockChecker etcdCli *clientv3.Client + // reorgCtx is used for reorganization. + reorgCtx struct { + sync.RWMutex + // reorgCtxMap maps job ID to reorg context. + reorgCtxMap map[int64]*reorgCtx + } + // hook may be modified. mu struct { sync.RWMutex @@ -244,6 +252,41 @@ func (dc *ddlCtx) isOwner() bool { return isOwner } +func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx { + dc.reorgCtx.RLock() + defer dc.reorgCtx.RUnlock() + return dc.reorgCtx.reorgCtxMap[job.ID] +} + +func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx { + rc := &reorgCtx{} + rc.doneCh = make(chan error, 1) + // initial reorgCtx + rc.setRowCount(r.Job.GetRowCount()) + rc.setNextKey(r.StartKey) + rc.setCurrentElement(r.currElement) + rc.mu.warnings = make(map[errors.ErrorID]*terror.Error) + rc.mu.warningsCount = make(map[errors.ErrorID]int64) + dc.reorgCtx.Lock() + defer dc.reorgCtx.Unlock() + dc.reorgCtx.reorgCtxMap[r.Job.ID] = rc + return rc +} + +func (dc *ddlCtx) removeReorgCtx(job *model.Job) { + dc.reorgCtx.Lock() + defer dc.reorgCtx.Unlock() + delete(dc.reorgCtx.reorgCtxMap, job.ID) +} + +func (dc *ddlCtx) notifyReorgCancel(job *model.Job) { + rc := dc.getReorgCtx(job) + if rc == nil { + return + } + rc.notifyReorgCancel() +} + // EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus. func EnableTiFlashPoll(d interface{}) { if dd, ok := d.(*ddl); ok { @@ -342,6 +385,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl { tableLockCkr: deadLockCkr, etcdCli: opt.EtcdCli, } + ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx) ddlCtx.mu.hook = opt.Hook ddlCtx.mu.interceptor = &BaseInterceptor{} d := &ddl{ diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index 3f3d02a877..8728542be3 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -562,7 +562,7 @@ func TestReorg(t *testing.T) { require.Equal(t, ctx.Value(testCtxKey), 1) ctx.ClearValue(testCtxKey) - err = sessiontxn.NewTxn(context.Background(), ctx) + err = ctx.NewTxn(context.Background()) require.NoError(t, err) txn, err := ctx.Txn(true) require.NoError(t, err) @@ -571,7 +571,7 @@ func TestReorg(t *testing.T) { err = txn.Rollback() require.NoError(t, err) - err = sessiontxn.NewTxn(context.Background(), ctx) + err = ctx.NewTxn(context.Background()) require.NoError(t, err) txn, err = ctx.Txn(true) require.NoError(t, err) @@ -582,17 +582,11 @@ func TestReorg(t *testing.T) { rowCount := int64(10) handle := test.handle - f := func() error { - d.generalWorker().reorgCtx.setRowCount(rowCount) - d.generalWorker().reorgCtx.setNextKey(handle.Encoded()) - time.Sleep(1*ReorgWaitTimeout + 100*time.Millisecond) - return nil - } job := &model.Job{ ID: 1, SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false. } - err = sessiontxn.NewTxn(context.Background(), ctx) + err = ctx.NewTxn(context.Background()) require.NoError(t, err) txn, err = ctx.Txn(true) require.NoError(t, err) @@ -601,6 +595,13 @@ func TestReorg(t *testing.T) { rInfo := &reorgInfo{ Job: job, currElement: e, + d: d.ddlCtx, + } + f := func() error { + d.getReorgCtx(job).setRowCount(rowCount) + d.getReorgCtx(job).setNextKey(handle.Encoded()) + time.Sleep(1*ReorgWaitTimeout + 100*time.Millisecond) + return nil } mockTbl := tables.MockTableFromMeta(&model.TableInfo{IsCommonHandle: test.isCommonHandle, CommonHandleVersion: 1}) err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, f) @@ -612,12 +613,11 @@ func TestReorg(t *testing.T) { err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, f) if err == nil { require.Equal(t, job.RowCount, rowCount) - require.Equal(t, d.generalWorker().reorgCtx.rowCount, int64(0)) // Test whether reorgInfo's Handle is update. err = txn.Commit(context.Background()) require.NoError(t, err) - err = sessiontxn.NewTxn(context.Background(), ctx) + err = ctx.NewTxn(context.Background()) require.NoError(t, err) m = meta.NewMeta(txn) @@ -625,8 +625,6 @@ func TestReorg(t *testing.T) { require.NoError(t, err1) require.Equal(t, info.StartKey, kv.Key(handle.Encoded())) require.Equal(t, info.currElement, e) - _, doneHandle, _ := d.generalWorker().reorgCtx.getRowCountAndKey() - require.Nil(t, doneHandle) break } } diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index cae3be063d..55968f191c 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -90,7 +90,6 @@ type worker struct { wg sync.WaitGroup sessPool *sessionPool // sessPool is used to new sessions to execute SQL in ddl package. - reorgCtx *reorgCtx // reorgCtx is used for reorganization. delRangeManager delRangeManager logCtx context.Context lockSeqNum bool @@ -126,7 +125,6 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan ctx: ctx, JobContext: NewJobContext(), ddlCtx: dCtx, - reorgCtx: &reorgCtx{notifyCancelReorgJob: 0}, sessPool: sessPool, delRangeManager: delRangeMgr, } diff --git a/ddl/index.go b/ddl/index.go index f4d7402825..298c92e210 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -613,12 +613,8 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1)) } } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() return false, ver, errors.Trace(err) } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() return true, ver, errors.Trace(err) } @@ -1041,18 +1037,18 @@ type addIndexWorker struct { distinctCheckFlags []bool } -func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *addIndexWorker { +func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo) *addIndexWorker { index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &addIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillWorker: newBackfillWorker(sessCtx, worker, id, t), + backfillWorker: newBackfillWorker(sessCtx, worker, id, t, reorgInfo), indexes: []table.Index{index}, rowDecoder: rowDecoder, defaultVals: make([]types.Datum, len(t.WritableCols())), rowMap: make(map[int64]types.Datum, len(decodeColMap)), metricCounter: metrics.BackfillTotalCounter.WithLabelValues("add_idx_rate"), - sqlMode: sqlMode, + sqlMode: reorgInfo.ReorgMeta.SQLMode, }, index: index, } @@ -1486,7 +1482,7 @@ type cleanUpIndexWorker struct { baseIndexWorker } -func newCleanUpIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *cleanUpIndexWorker { +func newCleanUpIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo) *cleanUpIndexWorker { indexes := make([]table.Index, 0, len(t.Indices())) rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) for _, index := range t.Indices() { @@ -1496,13 +1492,13 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t } return &cleanUpIndexWorker{ baseIndexWorker: baseIndexWorker{ - backfillWorker: newBackfillWorker(sessCtx, worker, id, t), + backfillWorker: newBackfillWorker(sessCtx, worker, id, t, reorgInfo), indexes: indexes, rowDecoder: rowDecoder, defaultVals: make([]types.Datum, len(t.WritableCols())), rowMap: make(map[int64]types.Datum, len(decodeColMap)), metricCounter: metrics.BackfillTotalCounter.WithLabelValues("cleanup_idx_rate"), - sqlMode: sqlMode, + sqlMode: reorgInfo.ReorgMeta.SQLMode, }, } } diff --git a/ddl/partition.go b/ddl/partition.go index d024ef47b4..e01db34e0e 100644 --- a/ddl/partition.go +++ b/ddl/partition.go @@ -1185,12 +1185,8 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) ( // if timeout, we should return, check for the owner and re-wait job done. return ver, nil } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() return ver, errors.Trace(err) } - // Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs. - w.reorgCtx.cleanNotifyReorgCancel() } tblInfo.Partition.DroppingDefinitions = nil // used by ApplyDiff in updateSchemaVersion diff --git a/ddl/reorg.go b/ddl/reorg.go index 6f8039c99c..2d7d594b01 100644 --- a/ddl/reorg.go +++ b/ddl/reorg.go @@ -106,10 +106,6 @@ func (rc *reorgCtx) notifyReorgCancel() { atomic.StoreInt32(&rc.notifyCancelReorgJob, 1) } -func (rc *reorgCtx) cleanNotifyReorgCancel() { - atomic.StoreInt32(&rc.notifyCancelReorgJob, 0) -} - func (rc *reorgCtx) isReorgCanceled() bool { return atomic.LoadInt32(&rc.notifyCancelReorgJob) == 1 } @@ -153,13 +149,6 @@ func (rc *reorgCtx) getRowCountAndKey() (int64, kv.Key, *meta.Element) { return row, h.key, element } -func (rc *reorgCtx) clean() { - rc.setRowCount(0) - rc.setNextKey(nil) - rc.resetWarnings() - rc.doneCh = nil -} - // runReorgJob is used as a portal to do the reorganization work. // eg: // 1: add index @@ -196,6 +185,7 @@ func (rc *reorgCtx) clean() { // After that, we can make sure that the worker goroutine is correctly shut down. func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error { job := reorgInfo.Job + d := reorgInfo.d // This is for tests compatible, because most of the early tests try to build the reorg job manually // without reorg meta info, which will cause nil pointer in here. if job.ReorgMeta == nil { @@ -206,25 +196,28 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. Location: &model.TimeZoneLocation{Name: time.UTC.String(), Offset: 0}, } } - if w.reorgCtx.doneCh == nil { + + rc := w.getReorgCtx(job) + if rc == nil { // Since reorg job will be interrupted for polling the cancel action outside. we don't need to wait for 2.5s // for the later entrances. // lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly. if lease > 0 { delayForAsyncCommit() } - // start a reorganization job + // This job is cancelling, we should return ErrCancelledDDLJob directly. + // Q: Is there any possibility that the job is cancelling and has no reorgCtx? + // A: Yes, consider the case that we cancel the job when backfilling the last batch of data, the cancel txn is commit first, + // and then the backfill workers send signal to the `doneCh` of the reorgCtx, and then the DDL worker will remove the reorgCtx and + // update the DDL job to `done`, but at the commit time, the DDL txn will raise a "write conflict" error and retry, and it happens. + if job.IsCancelling() { + return dbterror.ErrCancelledDDLJob + } + rc = w.newReorgCtx(reorgInfo) w.wg.Add(1) - w.reorgCtx.doneCh = make(chan error, 1) - // initial reorgCtx - w.reorgCtx.setRowCount(job.GetRowCount()) - w.reorgCtx.setNextKey(reorgInfo.StartKey) - w.reorgCtx.setCurrentElement(reorgInfo.currElement) - w.reorgCtx.mu.warnings = make(map[errors.ErrorID]*terror.Error) - w.reorgCtx.mu.warningsCount = make(map[errors.ErrorID]int64) go func() { defer w.wg.Done() - w.reorgCtx.doneCh <- f() + rc.doneCh <- f() }() } @@ -240,13 +233,13 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. // wait reorganization job done or timeout select { - case err := <-w.reorgCtx.doneCh: + case err := <-rc.doneCh: // Since job is cancelled,we don't care about its partial counts. - if w.reorgCtx.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { - w.reorgCtx.clean() + if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) { + d.removeReorgCtx(job) return dbterror.ErrCancelledDDLJob } - rowCount, _, _ := w.reorgCtx.getRowCountAndKey() + rowCount, _, _ := rc.getRowCountAndKey() logutil.BgLogger().Info("[ddl] run reorg job done", zap.Int64("handled rows", rowCount)) // Update a job's RowCount. job.SetRowCount(rowCount) @@ -254,7 +247,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. // Update a job's warnings. w.mergeWarningsIntoJob(job) - w.reorgCtx.clean() + d.removeReorgCtx(job) // For other errors, even err is not nil here, we still wait the partial counts to be collected. // since in the next round, the startKey is brand new which is stored by last time. if err != nil { @@ -273,13 +266,11 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. } case <-w.ctx.Done(): logutil.BgLogger().Info("[ddl] run reorg job quit") - w.reorgCtx.setNextKey(nil) - w.reorgCtx.setRowCount(0) - w.reorgCtx.resetWarnings() + d.removeReorgCtx(job) // We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break. return dbterror.ErrWaitReorgTimeout case <-time.After(waitTimeout): - rowCount, doneKey, currentElement := w.reorgCtx.getRowCountAndKey() + rowCount, doneKey, currentElement := rc.getRowCountAndKey() // Update a job's RowCount. job.SetRowCount(rowCount) updateBackfillProgress(w, reorgInfo, tblInfo, rowCount) @@ -287,7 +278,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. // Update a job's warnings. w.mergeWarningsIntoJob(job) - w.reorgCtx.resetWarnings() + rc.resetWarnings() // Update a reorgInfo's handle. // Since daemon-worker is triggered by timer to store the info half-way. @@ -308,11 +299,12 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model. } func (w *worker) mergeWarningsIntoJob(job *model.Job) { - w.reorgCtx.mu.Lock() - partWarnings := w.reorgCtx.mu.warnings - partWarningsCount := w.reorgCtx.mu.warningsCount + rc := w.getReorgCtx(job) + rc.mu.Lock() + defer rc.mu.Unlock() + partWarnings := rc.mu.warnings + partWarningsCount := rc.mu.warningsCount job.SetWarnings(mergeWarningsAndWarningsCount(partWarnings, job.ReorgMeta.Warnings, partWarningsCount, job.ReorgMeta.WarningsCount)) - w.reorgCtx.mu.Unlock() } func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.TableInfo, @@ -362,20 +354,20 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 { return rows[0].GetInt64(0) } -func (w *worker) isReorgRunnable(d *ddlCtx) error { +func (w *worker) isReorgRunnable(job *model.Job) error { if isChanClosed(w.ctx.Done()) { // Worker is closed. So it can't do the reorganizational job. return dbterror.ErrInvalidWorker.GenWithStack("worker is closed") } - if w.reorgCtx.isReorgCanceled() { + if w.getReorgCtx(job).isReorgCanceled() { // Job is cancelled. So it can't be done. return dbterror.ErrCancelledDDLJob } - if !d.isOwner() { + if !w.isOwner() { // If it's not the owner, we will try later, so here just returns an error. - logutil.BgLogger().Info("[ddl] DDL worker is not the DDL owner", zap.String("ID", d.uuid)) + logutil.BgLogger().Info("[ddl] DDL worker is not the DDL owner", zap.String("ID", w.uuid)) return errors.Trace(dbterror.ErrNotOwner) } return nil diff --git a/ddl/rollingback.go b/ddl/rollingback.go index 7ff3c60889..659cf146e6 100644 --- a/ddl/rollingback.go +++ b/ddl/rollingback.go @@ -115,7 +115,7 @@ func rollingbackModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job) if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { // column type change workers are started. we have to ask them to exit. logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String())) - w.reorgCtx.notifyReorgCancel() + d.notifyReorgCancel(job) // Give the this kind of ddl one more round to run, the dbterror.ErrCancelledDDLJob should be fetched from the bottom up. return w.onModifyColumn(d, t, job) } @@ -329,7 +329,7 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 { // add index workers are started. need to ask them to exit. logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String())) - w.reorgCtx.notifyReorgCancel() + d.notifyReorgCancel(job) ver, err = w.onCreateIndex(d, t, job, isPK) } else { // add index workers are not started, remove the indexInfo in tableInfo.