From 129dadf4d0d90c8d7288d4e60cec511e725453fa Mon Sep 17 00:00:00 2001 From: wjHuang Date: Wed, 23 Feb 2022 16:41:43 +0800 Subject: [PATCH] ddl: fix data race for ddl seq (#32542) close pingcap/tidb#32541 --- ddl/ddl.go | 11 ++++++++--- ddl/ddl_worker.go | 21 ++++++++++----------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/ddl/ddl.go b/ddl/ddl.go index 48e9d5042a..b66e9fd0c4 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -217,6 +217,11 @@ type ddlCtx struct { hook Callback interceptor Interceptor } + + ddlSeqNumMu struct { + sync.Mutex + seqNum uint64 + } } func (dc *ddlCtx) isOwner() bool { @@ -379,8 +384,8 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { d.workers = make(map[workerType]*worker, 2) d.sessPool = newSessionPool(ctxPool) d.delRangeMgr = d.newDeleteRangeManager(ctxPool == nil) - d.workers[generalWorker] = newWorker(d.ctx, generalWorker, d.sessPool, d.delRangeMgr) - d.workers[addIdxWorker] = newWorker(d.ctx, addIdxWorker, d.sessPool, d.delRangeMgr) + d.workers[generalWorker] = newWorker(d.ctx, generalWorker, d.sessPool, d.delRangeMgr, d.ddlCtx) + d.workers[addIdxWorker] = newWorker(d.ctx, addIdxWorker, d.sessPool, d.delRangeMgr, d.ddlCtx) for _, worker := range d.workers { worker.wg.Add(1) w := worker @@ -395,7 +400,7 @@ func (d *ddl) Start(ctxPool *pools.ResourcePool) error { err = kv.RunInNewTxn(d.ctx, d.store, true, func(ctx context.Context, txn kv.Transaction) error { t := meta.NewMeta(txn) - globalSeqNum, err = t.GetHistoryDDLCount() + d.ddlSeqNumMu.seqNum, err = t.GetHistoryDDLCount() return err }) if err != nil { diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 7885009b71..ae0348efd2 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -53,9 +53,6 @@ var ( ddlWorkerID = int32(0) // WaitTimeWhenErrorOccurred is waiting interval when processing DDL jobs encounter errors. WaitTimeWhenErrorOccurred = int64(1 * time.Second) - - ddlSeqNumMu sync.Mutex - globalSeqNum uint64 ) // GetWaitTimeWhenErrorOccurred return waiting interval when processing DDL jobs encounter errors. @@ -97,6 +94,7 @@ type worker struct { logCtx context.Context lockSeqNum bool + *ddlCtx ddlJobCache } @@ -109,7 +107,7 @@ type ddlJobCache struct { cacheDigest *parser.Digest } -func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager) *worker { +func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRangeMgr delRangeManager, dCtx *ddlCtx) *worker { worker := &worker{ id: atomic.AddInt32(&ddlWorkerID, 1), tp: tp, @@ -121,6 +119,7 @@ func newWorker(ctx context.Context, tp workerType, sessPool *sessionPool, delRan cacheNormalizedSQL: "", cacheDigest: nil, }, + ddlCtx: dCtx, reorgCtx: &reorgCtx{notifyCancelReorgJob: 0}, sessPool: sessPool, delRangeManager: delRangeMgr, @@ -450,10 +449,10 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { } func (w *worker) writeDDLSeqNum(job *model.Job) { - ddlSeqNumMu.Lock() + w.ddlSeqNumMu.Lock() + w.ddlSeqNumMu.seqNum++ w.lockSeqNum = true - globalSeqNum++ - job.SeqNum = globalSeqNum + job.SeqNum = w.ddlSeqNumMu.seqNum } func finishRecoverTable(w *worker, job *model.Job) error { @@ -605,10 +604,10 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { if err != nil { if w.lockSeqNum { - // txn commit failed, we should reset globalSeqNum - globalSeqNum-- + // txn commit failed, we should reset seqNum. + w.ddlSeqNumMu.seqNum-- w.lockSeqNum = false - ddlSeqNumMu.Unlock() + w.ddlSeqNumMu.Unlock() } return errors.Trace(err) } else if job == nil { @@ -617,7 +616,7 @@ func (w *worker) handleDDLJobQueue(d *ddlCtx) error { } if w.lockSeqNum { w.lockSeqNum = false - ddlSeqNumMu.Unlock() + d.ddlSeqNumMu.Unlock() } w.waitDependencyJobFinished(job, &waitDependencyJobCnt)