ddl: delay before changing column from null to not null (#23364)
* ddl: delay before changing column from null to not null Signed-off-by: Yilin Chen <sticnarf@gmail.com> Co-authored-by: Ti Chi Robot <71242396+ti-chi-bot@users.noreply.github.com> Co-authored-by: Arenatlx <314806019@qq.com>
This commit is contained in:
@ -825,7 +825,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
|
||||
}
|
||||
|
||||
if !needChangeColumnData(oldCol, jobParam.newCol) {
|
||||
return w.doModifyColumn(t, job, dbInfo, tblInfo, jobParam.newCol, oldCol, jobParam.pos)
|
||||
return w.doModifyColumn(d, t, job, dbInfo, tblInfo, jobParam.newCol, oldCol, jobParam.pos)
|
||||
}
|
||||
|
||||
if jobParam.changingCol == nil {
|
||||
@ -1384,11 +1384,18 @@ func updateChangingInfo(changingCol *model.ColumnInfo, changingIdxs []*model.Ind
|
||||
|
||||
// doModifyColumn updates the column information and reorders all columns. It does not support modifying column data.
|
||||
func (w *worker) doModifyColumn(
|
||||
t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
|
||||
d *ddlCtx, t *meta.Meta, job *model.Job, dbInfo *model.DBInfo, tblInfo *model.TableInfo,
|
||||
newCol, oldCol *model.ColumnInfo, pos *ast.ColumnPosition) (ver int64, _ error) {
|
||||
// Column from null to not null.
|
||||
if !mysql.HasNotNullFlag(oldCol.Flag) && mysql.HasNotNullFlag(newCol.Flag) {
|
||||
noPreventNullFlag := !mysql.HasPreventNullInsertFlag(oldCol.Flag)
|
||||
|
||||
// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
|
||||
// We need to check after the flag is set
|
||||
if d.lease > 0 && !noPreventNullFlag {
|
||||
delayForAsyncCommit()
|
||||
}
|
||||
|
||||
// Introduce the `mysql.PreventNullInsertFlag` flag to prevent users from inserting or updating null values.
|
||||
err := modifyColsFromNull2NotNull(w, dbInfo, tblInfo, []*model.ColumnInfo{oldCol}, newCol.Name, oldCol.Tp != newCol.Tp)
|
||||
if err != nil {
|
||||
|
||||
11
ddl/ddl.go
11
ddl/ddl.go
@ -668,6 +668,17 @@ type RecoverInfo struct {
|
||||
CurAutoRandID int64
|
||||
}
|
||||
|
||||
// delayForAsyncCommit sleeps `SafeWindow + AllowedClockDrift` before a DDL job finishes.
|
||||
// It should be called before any DDL that could break data consistency.
|
||||
// This provides a safe window for async commit and 1PC to commit with an old schema.
|
||||
func delayForAsyncCommit() {
|
||||
cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
|
||||
duration := cfg.SafeWindow + cfg.AllowedClockDrift
|
||||
logutil.BgLogger().Info("sleep before DDL finishes to make async commit and 1PC safe",
|
||||
zap.Duration("duration", duration))
|
||||
time.Sleep(duration)
|
||||
}
|
||||
|
||||
var (
|
||||
// RunInGoTest is used to identify whether ddl in running in the test.
|
||||
RunInGoTest bool
|
||||
|
||||
@ -26,7 +26,6 @@ import (
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/parser/mysql"
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/distsql"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/meta"
|
||||
@ -158,15 +157,9 @@ func (rc *reorgCtx) clean() {
|
||||
}
|
||||
|
||||
func (w *worker) runReorgJob(t *meta.Meta, reorgInfo *reorgInfo, tblInfo *model.TableInfo, lease time.Duration, f func() error) error {
|
||||
// Sleep for reorgDelay before doing reorganization.
|
||||
// This provides a safe window for async commit and 1PC to commit with an old schema.
|
||||
// lease = 0 means it's in an integration test. In this case we don't delay so the test won't run too slowly.
|
||||
if lease > 0 {
|
||||
cfg := config.GetGlobalConfig().TiKVClient.AsyncCommit
|
||||
reorgDelay := cfg.SafeWindow + cfg.AllowedClockDrift
|
||||
logutil.BgLogger().Info("sleep before reorganization to make async commit safe",
|
||||
zap.Duration("duration", reorgDelay))
|
||||
time.Sleep(reorgDelay)
|
||||
delayForAsyncCommit()
|
||||
}
|
||||
|
||||
job := reorgInfo.Job
|
||||
|
||||
@ -1001,7 +1001,9 @@ func (c *twoPhaseCommitter) execute(ctx context.Context) (err error) {
|
||||
}
|
||||
}
|
||||
|
||||
failpoint.Inject("beforePrewrite", nil)
|
||||
if c.sessionID > 0 {
|
||||
failpoint.Inject("beforePrewrite", nil)
|
||||
}
|
||||
|
||||
c.prewriteStarted = true
|
||||
var binlogChan <-chan BinlogWriteResult
|
||||
|
||||
Reference in New Issue
Block a user