ddl: make reorgCtx maintained by ddlCtx (#33878)

close pingcap/tidb#34121
This commit is contained in:
xiongjiwei
2022-05-26 13:40:47 +08:00
committed by GitHub
parent fc1065032b
commit dba4dab5d5
9 changed files with 112 additions and 91 deletions

View File

@ -146,6 +146,7 @@ type backfillTaskContext struct {
type backfillWorker struct {
id int
ddlWorker *worker
reorgInfo *reorgInfo
batchCnt int
sessCtx sessionctx.Context
taskCh chan *reorgBackfillTask
@ -155,11 +156,12 @@ type backfillWorker struct {
priority int
}
func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable) *backfillWorker {
func newBackfillWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, reorgInfo *reorgInfo) *backfillWorker {
return &backfillWorker{
id: id,
table: t,
ddlWorker: worker,
reorgInfo: reorgInfo,
batchCnt: int(variable.GetDDLReorgBatchSize()),
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
@ -234,13 +236,14 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
lastLogCount := 0
lastLogTime := time.Now()
startTime := lastLogTime
rc := d.getReorgCtx(w.reorgInfo.Job)
for {
// Give job chance to be canceled, if we not check it here,
// if there is panic in bf.BackfillDataInTxn we will never cancel the job.
// Because reorgRecordTask may run a long time,
// we should check whether this ddl job is still runnable.
err := w.ddlWorker.isReorgRunnable(d)
err := w.ddlWorker.isReorgRunnable(w.reorgInfo.Job)
if err != nil {
result.err = err
return result
@ -263,8 +266,8 @@ func (w *backfillWorker) handleBackfillTask(d *ddlCtx, task *reorgBackfillTask,
// small ranges. This will cause the `redo` action in reorganization.
// So for added count and warnings collection, it is recommended to collect the statistics in every
// successfully committed small ranges rather than fetching it in the total result.
w.ddlWorker.reorgCtx.increaseRowCount(int64(taskCtx.addedCount))
w.ddlWorker.reorgCtx.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)
rc.increaseRowCount(int64(taskCtx.addedCount))
rc.mergeWarnings(taskCtx.warnings, taskCtx.warningsCount)
if num := result.scanCount - lastLogCount; num >= 30000 {
lastLogCount = result.scanCount
@ -399,7 +402,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
nextKey, taskAddedCount, err := w.waitTaskResults(workers, taskCnt, totalAddedCount, startKey)
elapsedTime := time.Since(startTime)
if err == nil {
err = w.isReorgRunnable(reorgInfo.d)
err = w.isReorgRunnable(reorgInfo.Job)
}
if err != nil {
@ -420,7 +423,7 @@ func (w *worker) handleReorgTasks(reorgInfo *reorgInfo, totalAddedCount *int64,
}
// nextHandle will be updated periodically in runReorgJob, so no need to update it here.
w.reorgCtx.setNextKey(nextKey)
w.getReorgCtx(reorgInfo.Job).setNextKey(nextKey)
metrics.BatchAddIdxHistogram.WithLabelValues(metrics.LblOK).Observe(elapsedTime.Seconds())
logutil.BgLogger().Info("[ddl] backfill workers successfully processed batch",
zap.ByteString("elementType", reorgInfo.currElement.TypeKey),
@ -583,7 +586,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
return errors.Trace(err)
}
if err := w.isReorgRunnable(reorgInfo.d); err != nil {
if err := w.isReorgRunnable(reorgInfo.Job); err != nil {
return errors.Trace(err)
}
if startKey == nil && endKey == nil {
@ -642,19 +645,19 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker := newAddIndexWorker(sessCtx, w, i, t, indexInfo, decodeColMap, reorgInfo)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeUpdateColumnWorker:
// Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting.
sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
updateWorker := newUpdateColumnWorker(sessCtx, w, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo)
updateWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo.ReorgMeta.SQLMode)
idxWorker := newCleanUpIndexWorker(sessCtx, w, i, t, decodeColMap, reorgInfo)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)

View File

@ -1060,8 +1060,6 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
return false, ver, nil
}
if kv.IsTxnRetryableError(err) {
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
if err1 := t.RemoveDDLReorgHandle(job, reorgInfo.elements); err1 != nil {
@ -1070,12 +1068,8 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
}
logutil.BgLogger().Warn("[ddl] run modify column job failed, convert job to rollback", zap.String("job", job.String()), zap.Error(err))
job.State = model.JobStateRollingback
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return true, ver, nil
}
@ -1235,7 +1229,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
TestReorgGoroutineRunning <- a
for {
time.Sleep(30 * time.Millisecond)
if w.reorgCtx.isReorgCanceled() {
if w.getReorgCtx(reorgInfo.Job).isReorgCanceled() {
// Job is cancelled. So it can't be done.
failpoint.Return(dbterror.ErrCancelledDDLJob)
}
@ -1281,7 +1275,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
}
// Update the element in the reorgCtx to keep the atomic access for daemon-worker.
w.reorgCtx.setCurrentElement(reorgInfo.elements[i+1])
w.getReorgCtx(reorgInfo.Job).setCurrentElement(reorgInfo.elements[i+1])
// Update the element in the reorgInfo for updating the reorg meta below.
reorgInfo.currElement = reorgInfo.elements[i+1]
@ -1320,16 +1314,16 @@ type updateColumnWorker struct {
sqlMode mysql.SQLMode
}
func newUpdateColumnWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *updateColumnWorker {
func newUpdateColumnWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo) *updateColumnWorker {
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillWorker: newBackfillWorker(sessCtx, worker, id, t),
backfillWorker: newBackfillWorker(sessCtx, worker, id, t, reorgInfo),
oldColInfo: oldCol,
newColInfo: newCol,
metricCounter: metrics.BackfillTotalCounter.WithLabelValues("update_col_rate"),
rowDecoder: rowDecoder,
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
sqlMode: sqlMode,
sqlMode: reorgInfo.ReorgMeta.SQLMode,
}
}

View File

@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/variable"
@ -222,6 +223,13 @@ type ddlCtx struct {
tableLockCkr util.DeadTableLockChecker
etcdCli *clientv3.Client
// reorgCtx is used for reorganization.
reorgCtx struct {
sync.RWMutex
// reorgCtxMap maps job ID to reorg context.
reorgCtxMap map[int64]*reorgCtx
}
// hook may be modified.
mu struct {
sync.RWMutex
@ -244,6 +252,41 @@ func (dc *ddlCtx) isOwner() bool {
return isOwner
}
func (dc *ddlCtx) getReorgCtx(job *model.Job) *reorgCtx {
dc.reorgCtx.RLock()
defer dc.reorgCtx.RUnlock()
return dc.reorgCtx.reorgCtxMap[job.ID]
}
func (dc *ddlCtx) newReorgCtx(r *reorgInfo) *reorgCtx {
rc := &reorgCtx{}
rc.doneCh = make(chan error, 1)
// initial reorgCtx
rc.setRowCount(r.Job.GetRowCount())
rc.setNextKey(r.StartKey)
rc.setCurrentElement(r.currElement)
rc.mu.warnings = make(map[errors.ErrorID]*terror.Error)
rc.mu.warningsCount = make(map[errors.ErrorID]int64)
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
dc.reorgCtx.reorgCtxMap[r.Job.ID] = rc
return rc
}
func (dc *ddlCtx) removeReorgCtx(job *model.Job) {
dc.reorgCtx.Lock()
defer dc.reorgCtx.Unlock()
delete(dc.reorgCtx.reorgCtxMap, job.ID)
}
func (dc *ddlCtx) notifyReorgCancel(job *model.Job) {
rc := dc.getReorgCtx(job)
if rc == nil {
return
}
rc.notifyReorgCancel()
}
// EnableTiFlashPoll enables TiFlash poll loop aka PollTiFlashReplicaStatus.
func EnableTiFlashPoll(d interface{}) {
if dd, ok := d.(*ddl); ok {
@ -342,6 +385,7 @@ func newDDL(ctx context.Context, options ...Option) *ddl {
tableLockCkr: deadLockCkr,
etcdCli: opt.EtcdCli,
}
ddlCtx.reorgCtx.reorgCtxMap = make(map[int64]*reorgCtx)
ddlCtx.mu.hook = opt.Hook
ddlCtx.mu.interceptor = &BaseInterceptor{}
d := &ddl{

View File

@ -562,7 +562,7 @@ func TestReorg(t *testing.T) {
require.Equal(t, ctx.Value(testCtxKey), 1)
ctx.ClearValue(testCtxKey)
err = sessiontxn.NewTxn(context.Background(), ctx)
err = ctx.NewTxn(context.Background())
require.NoError(t, err)
txn, err := ctx.Txn(true)
require.NoError(t, err)
@ -571,7 +571,7 @@ func TestReorg(t *testing.T) {
err = txn.Rollback()
require.NoError(t, err)
err = sessiontxn.NewTxn(context.Background(), ctx)
err = ctx.NewTxn(context.Background())
require.NoError(t, err)
txn, err = ctx.Txn(true)
require.NoError(t, err)
@ -582,17 +582,11 @@ func TestReorg(t *testing.T) {
rowCount := int64(10)
handle := test.handle
f := func() error {
d.generalWorker().reorgCtx.setRowCount(rowCount)
d.generalWorker().reorgCtx.setNextKey(handle.Encoded())
time.Sleep(1*ReorgWaitTimeout + 100*time.Millisecond)
return nil
}
job := &model.Job{
ID: 1,
SnapshotVer: 1, // Make sure it is not zero. So the reorgInfo's first is false.
}
err = sessiontxn.NewTxn(context.Background(), ctx)
err = ctx.NewTxn(context.Background())
require.NoError(t, err)
txn, err = ctx.Txn(true)
require.NoError(t, err)
@ -601,6 +595,13 @@ func TestReorg(t *testing.T) {
rInfo := &reorgInfo{
Job: job,
currElement: e,
d: d.ddlCtx,
}
f := func() error {
d.getReorgCtx(job).setRowCount(rowCount)
d.getReorgCtx(job).setNextKey(handle.Encoded())
time.Sleep(1*ReorgWaitTimeout + 100*time.Millisecond)
return nil
}
mockTbl := tables.MockTableFromMeta(&model.TableInfo{IsCommonHandle: test.isCommonHandle, CommonHandleVersion: 1})
err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, f)
@ -612,12 +613,11 @@ func TestReorg(t *testing.T) {
err = d.generalWorker().runReorgJob(m, rInfo, mockTbl.Meta(), d.lease, f)
if err == nil {
require.Equal(t, job.RowCount, rowCount)
require.Equal(t, d.generalWorker().reorgCtx.rowCount, int64(0))
// Test whether reorgInfo's Handle is update.
err = txn.Commit(context.Background())
require.NoError(t, err)
err = sessiontxn.NewTxn(context.Background(), ctx)
err = ctx.NewTxn(context.Background())
require.NoError(t, err)
m = meta.NewMeta(txn)
@ -625,8 +625,6 @@ func TestReorg(t *testing.T) {
require.NoError(t, err1)
require.Equal(t, info.StartKey, kv.Key(handle.Encoded()))
require.Equal(t, info.currElement, e)
_, doneHandle, _ := d.generalWorker().reorgCtx.getRowCountAndKey()
require.Nil(t, doneHandle)
break
}
}

View File

@ -90,7 +90,6 @@ type worker struct {
wg sync.WaitGroup
sessPool *sessionPool // sessPool is used to new sessions to execute SQL in ddl package.
reorgCtx *reorgCtx // reorgCtx is used for reorganization.
delRangeManager delRangeManager
logCtx context.Context
lockSeqNum bool
@ -126,7 +125,6 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan
ctx: ctx,
JobContext: NewJobContext(),
ddlCtx: dCtx,
reorgCtx: &reorgCtx{notifyCancelReorgJob: 0},
sessPool: sessPool,
delRangeManager: delRangeMgr,
}

View File

@ -613,12 +613,8 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
logutil.BgLogger().Warn("[ddl] run add index job failed, convert job to rollback, RemoveDDLReorgHandle failed", zap.String("job", job.String()), zap.Error(err1))
}
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return false, ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return true, ver, errors.Trace(err)
}
@ -1041,18 +1037,18 @@ type addIndexWorker struct {
distinctCheckFlags []bool
}
func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *addIndexWorker {
func newAddIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo) *addIndexWorker {
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &addIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillWorker: newBackfillWorker(sessCtx, worker, id, t),
backfillWorker: newBackfillWorker(sessCtx, worker, id, t, reorgInfo),
indexes: []table.Index{index},
rowDecoder: rowDecoder,
defaultVals: make([]types.Datum, len(t.WritableCols())),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
metricCounter: metrics.BackfillTotalCounter.WithLabelValues("add_idx_rate"),
sqlMode: sqlMode,
sqlMode: reorgInfo.ReorgMeta.SQLMode,
},
index: index,
}
@ -1486,7 +1482,7 @@ type cleanUpIndexWorker struct {
baseIndexWorker
}
func newCleanUpIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, sqlMode mysql.SQLMode) *cleanUpIndexWorker {
func newCleanUpIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo) *cleanUpIndexWorker {
indexes := make([]table.Index, 0, len(t.Indices()))
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
for _, index := range t.Indices() {
@ -1496,13 +1492,13 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, worker *worker, id int, t
}
return &cleanUpIndexWorker{
baseIndexWorker: baseIndexWorker{
backfillWorker: newBackfillWorker(sessCtx, worker, id, t),
backfillWorker: newBackfillWorker(sessCtx, worker, id, t, reorgInfo),
indexes: indexes,
rowDecoder: rowDecoder,
defaultVals: make([]types.Datum, len(t.WritableCols())),
rowMap: make(map[int64]types.Datum, len(decodeColMap)),
metricCounter: metrics.BackfillTotalCounter.WithLabelValues("cleanup_idx_rate"),
sqlMode: sqlMode,
sqlMode: reorgInfo.ReorgMeta.SQLMode,
},
}
}

View File

@ -1185,12 +1185,8 @@ func (w *worker) onDropTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (
// if timeout, we should return, check for the owner and re-wait job done.
return ver, nil
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
return ver, errors.Trace(err)
}
// Clean up the channel of notifyCancelReorgJob. Make sure it can't affect other jobs.
w.reorgCtx.cleanNotifyReorgCancel()
}
tblInfo.Partition.DroppingDefinitions = nil
// used by ApplyDiff in updateSchemaVersion

View File

@ -106,10 +106,6 @@ func (rc *reorgCtx) notifyReorgCancel() {
atomic.StoreInt32(&rc.notifyCancelReorgJob, 1)
}
func (rc *reorgCtx) cleanNotifyReorgCancel() {
atomic.StoreInt32(&rc.notifyCancelReorgJob, 0)
}
func (rc *reorgCtx) isReorgCanceled() bool {
return atomic.LoadInt32(&rc.notifyCancelReorgJob) == 1
}
@ -153,13 +149,6 @@ func (rc *reorgCtx) getRowCountAndKey() (int64, kv.Key, *meta.Element) {
return row, h.key, element
}
func (rc *reorgCtx) clean() {
rc.setRowCount(0)
rc.setNextKey(nil)
rc.resetWarnings()
rc.doneCh = nil
}
// runReorgJob is used as a portal to do the reorganization work.
// eg:
// 1: add index
@ -196,6 +185,7 @@ func (rc *reorgCtx) clean() {
// After that, we can make sure that the worker goroutine is correctly shut down.
func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error {
job := reorgInfo.Job
d := reorgInfo.d
// This is for tests compatible, because most of the early tests try to build the reorg job manually
// without reorg meta info, which will cause nil pointer in here.
if job.ReorgMeta == nil {
@ -206,25 +196,28 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
Location: &model.TimeZoneLocation{Name: time.UTC.String(), Offset: 0},
}
}
if w.reorgCtx.doneCh == nil {
rc := w.getReorgCtx(job)
if rc == nil {
// Since reorg job will be interrupted for polling the cancel action outside. we don't need to wait for 2.5s
// for the later entrances.
// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
if lease > 0 {
delayForAsyncCommit()
}
// start a reorganization job
// This job is cancelling, we should return ErrCancelledDDLJob directly.
// Q: Is there any possibility that the job is cancelling and has no reorgCtx?
// A: Yes, consider the case that we cancel the job when backfilling the last batch of data, the cancel txn is commit first,
// and then the backfill workers send signal to the `doneCh` of the reorgCtx, and then the DDL worker will remove the reorgCtx and
// update the DDL job to `done`, but at the commit time, the DDL txn will raise a "write conflict" error and retry, and it happens.
if job.IsCancelling() {
return dbterror.ErrCancelledDDLJob
}
rc = w.newReorgCtx(reorgInfo)
w.wg.Add(1)
w.reorgCtx.doneCh = make(chan error, 1)
// initial reorgCtx
w.reorgCtx.setRowCount(job.GetRowCount())
w.reorgCtx.setNextKey(reorgInfo.StartKey)
w.reorgCtx.setCurrentElement(reorgInfo.currElement)
w.reorgCtx.mu.warnings = make(map[errors.ErrorID]*terror.Error)
w.reorgCtx.mu.warningsCount = make(map[errors.ErrorID]int64)
go func() {
defer w.wg.Done()
w.reorgCtx.doneCh <- f()
rc.doneCh <- f()
}()
}
@ -240,13 +233,13 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
// wait reorganization job done or timeout
select {
case err := <-w.reorgCtx.doneCh:
case err := <-rc.doneCh:
// Since job is cancelled,we don't care about its partial counts.
if w.reorgCtx.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
w.reorgCtx.clean()
if rc.isReorgCanceled() || terror.ErrorEqual(err, dbterror.ErrCancelledDDLJob) {
d.removeReorgCtx(job)
return dbterror.ErrCancelledDDLJob
}
rowCount, _, _ := w.reorgCtx.getRowCountAndKey()
rowCount, _, _ := rc.getRowCountAndKey()
logutil.BgLogger().Info("[ddl] run reorg job done", zap.Int64("handled rows", rowCount))
// Update a job's RowCount.
job.SetRowCount(rowCount)
@ -254,7 +247,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
// Update a job's warnings.
w.mergeWarningsIntoJob(job)
w.reorgCtx.clean()
d.removeReorgCtx(job)
// For other errors, even err is not nil here, we still wait the partial counts to be collected.
// since in the next round, the startKey is brand new which is stored by last time.
if err != nil {
@ -273,13 +266,11 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
}
case <-w.ctx.Done():
logutil.BgLogger().Info("[ddl] run reorg job quit")
w.reorgCtx.setNextKey(nil)
w.reorgCtx.setRowCount(0)
w.reorgCtx.resetWarnings()
d.removeReorgCtx(job)
// We return dbterror.ErrWaitReorgTimeout here too, so that outer loop will break.
return dbterror.ErrWaitReorgTimeout
case <-time.After(waitTimeout):
rowCount, doneKey, currentElement := w.reorgCtx.getRowCountAndKey()
rowCount, doneKey, currentElement := rc.getRowCountAndKey()
// Update a job's RowCount.
job.SetRowCount(rowCount)
updateBackfillProgress(w, reorgInfo, tblInfo, rowCount)
@ -287,7 +278,7 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
// Update a job's warnings.
w.mergeWarningsIntoJob(job)
w.reorgCtx.resetWarnings()
rc.resetWarnings()
// Update a reorgInfo's handle.
// Since daemon-worker is triggered by timer to store the info half-way.
@ -308,11 +299,12 @@ func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.
}
func (w *worker) mergeWarningsIntoJob(job *model.Job) {
w.reorgCtx.mu.Lock()
partWarnings := w.reorgCtx.mu.warnings
partWarningsCount := w.reorgCtx.mu.warningsCount
rc := w.getReorgCtx(job)
rc.mu.Lock()
defer rc.mu.Unlock()
partWarnings := rc.mu.warnings
partWarningsCount := rc.mu.warningsCount
job.SetWarnings(mergeWarningsAndWarningsCount(partWarnings, job.ReorgMeta.Warnings, partWarningsCount, job.ReorgMeta.WarningsCount))
w.reorgCtx.mu.Unlock()
}
func updateBackfillProgress(w *worker, reorgInfo *reorgInfo, tblInfo *model.TableInfo,
@ -362,20 +354,20 @@ func getTableTotalCount(w *worker, tblInfo *model.TableInfo) int64 {
return rows[0].GetInt64(0)
}
func (w *worker) isReorgRunnable(d *ddlCtx) error {
func (w *worker) isReorgRunnable(job *model.Job) error {
if isChanClosed(w.ctx.Done()) {
// Worker is closed. So it can't do the reorganizational job.
return dbterror.ErrInvalidWorker.GenWithStack("worker is closed")
}
if w.reorgCtx.isReorgCanceled() {
if w.getReorgCtx(job).isReorgCanceled() {
// Job is cancelled. So it can't be done.
return dbterror.ErrCancelledDDLJob
}
if !d.isOwner() {
if !w.isOwner() {
// If it's not the owner, we will try later, so here just returns an error.
logutil.BgLogger().Info("[ddl] DDL worker is not the DDL owner", zap.String("ID", d.uuid))
logutil.BgLogger().Info("[ddl] DDL worker is not the DDL owner", zap.String("ID", w.uuid))
return errors.Trace(dbterror.ErrNotOwner)
}
return nil

View File

@ -115,7 +115,7 @@ func rollingbackModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job)
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
// column type change workers are started. we have to ask them to exit.
logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String()))
w.reorgCtx.notifyReorgCancel()
d.notifyReorgCancel(job)
// Give the this kind of ddl one more round to run, the dbterror.ErrCancelledDDLJob should be fetched from the bottom up.
return w.onModifyColumn(d, t, job)
}
@ -329,7 +329,7 @@ func rollingbackAddIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job, isP
if job.SchemaState == model.StateWriteReorganization && job.SnapshotVer != 0 {
// add index workers are started. need to ask them to exit.
logutil.Logger(w.logCtx).Info("[ddl] run the cancelling DDL job", zap.String("job", job.String()))
w.reorgCtx.notifyReorgCancel()
d.notifyReorgCancel(job)
ver, err = w.onCreateIndex(d, t, job, isPK)
} else {
// add index workers are not started, remove the indexInfo in tableInfo.