ddl: refine param for backfillWorker (#37070)

This commit is contained in:
Hangjie Mo
2022-08-15 10:12:50 +08:00
committed by GitHub
parent 1b3c09bba1
commit 62acece63c
4 changed files with 89 additions and 31 deletions

View File

@ -154,6 +154,9 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
"server/conn_test.go": "server/conn_test.go",
@ -283,6 +286,9 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"expression/builtin_cast.go": "expression/builtin_cast code",
"server/conn.go": "server/conn.go",
"server/conn_stmt.go": "server/conn_stmt.go",
@ -638,6 +644,9 @@
".*_generated\\.go$": "ignore generated code"
},
"only_files": {
"ddl/backfilling.go": "ddl/backfilling.go",
"ddl/column.go": "ddl/column.go",
"ddl/index.go": "ddl/index.go",
"expression/builtin_cast.go": "enable expression/builtin_cast.go",
"planner/core/plan.go": "planner/core/plan.go",
"server/conn.go": "server/conn.go",

View File

@ -164,7 +164,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable
sessCtx: sessCtx,
taskCh: make(chan *reorgBackfillTask, 1),
resultCh: make(chan *backfillResult, 1),
priority: kv.PriorityLow,
priority: reorgInfo.Job.Priority,
}
}
@ -365,7 +365,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey
return ranges, nil
}
func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int,
totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) {
var (
addedCount int64
@ -585,7 +585,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error {
//
// The above operations are completed in a transaction.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo *model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error {
job := reorgInfo.Job
totalAddedCount := job.GetRowCount()
@ -604,6 +604,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}
failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(job.ErrorCount)) + ", mock unknown type: ast.whenClause."))
}
@ -656,20 +657,17 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
switch bfWorkerType {
case typeAddIndexWorker:
idxWorker := newAddIndexWorker(sessCtx, i, t, indexInfo, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
idxWorker := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
case typeUpdateColumnWorker:
// Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting.
sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true
updateWorker := newUpdateColumnWorker(sessCtx, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo, jc)
updateWorker.priority = job.Priority
updateWorker := newUpdateColumnWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker)
go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job)
case typeCleanUpIndexWorker:
idxWorker := newCleanUpIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc)
idxWorker.priority = job.Priority
backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker)
go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job)
default:
@ -684,6 +682,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba
}
failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
num := int(atomic.LoadInt32(&TestCheckWorkerNumber))
if num != 0 {

View File

@ -111,6 +111,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error)
}
failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
failpoint.Return(ver, errors.New("occur an error before decode args"))
}
@ -205,7 +206,7 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) {
}
}
func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) {
func checkDropColumnForStatePublic(colInfo *model.ColumnInfo) (err error) {
// When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column".
// NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value.
// And we need consider the column without not-null flag.
@ -250,7 +251,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error)
colInfo.State = model.StateWriteOnly
setIndicesState(idxInfos, model.StateWriteOnly)
tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1)
err = checkDropColumnForStatePublic(tblInfo, colInfo)
err = checkDropColumnForStatePublic(colInfo)
if err != nil {
return ver, errors.Trace(err)
}
@ -521,6 +522,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in
}
failpoint.Inject("uninitializedOffsetAndState", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
if modifyInfo.newCol.State != model.StatePublic {
failpoint.Return(ver, errors.New("the column state is wrong"))
@ -690,6 +692,7 @@ func (w *worker) doModifyColumnTypeWithData(
defer w.sessPool.put(sctx)
ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL)
//nolint:forcetypeassert
_, _, err = sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, valStr)
if err != nil {
job.State = model.JobStateCancelled
@ -815,7 +818,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J
// Use old column name to generate less confusing error messages.
changingColCpy := changingCol.Clone()
changingColCpy.Name = oldCol.Name
return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo)
return w.updateCurrentElement(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
@ -999,17 +1002,18 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf
return elements
}
func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error {
func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, nil, oldColInfo, colInfo, reorgInfo)
return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, reorgInfo)
}
// TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started.
var TestReorgGoroutineRunning = make(chan interface{})
// updateColumnAndIndexes handles the modify column reorganization state for a table.
func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.ColumnInfo, idxes []*model.IndexInfo, reorgInfo *reorgInfo) error {
// updateCurrentElement update the current element for reorgInfo.
func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error {
failpoint.Inject("mockInfiniteReorgLogic", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
a := new(interface{})
TestReorgGoroutineRunning <- a
@ -1024,7 +1028,8 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
})
// TODO: Support partition tables.
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
err := w.updatePhysicalTableRow(t.(table.PhysicalTable), oldCol, col, reorgInfo)
//nolint:forcetypeassert
err := w.updatePhysicalTableRow(t.(table.PhysicalTable), reorgInfo)
if err != nil {
return errors.Trace(err)
}
@ -1035,6 +1040,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
//nolint:forcetypeassert
originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority)
if err != nil {
return errors.Trace(err)
@ -1044,8 +1050,8 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
startElementOffsetToResetHandle := -1
// This backfill job starts with backfilling index data, whose index ID is currElement.ID.
if bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
for i, idx := range idxes {
if reorgInfo.currElement.ID == idx.ID {
for i, element := range reorgInfo.elements[1:] {
if reorgInfo.currElement.ID == element.ID {
startElementOffset = i
startElementOffsetToResetHandle = i
break
@ -1053,7 +1059,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
}
}
for i := startElementOffset; i < len(idxes); i++ {
for i := startElementOffset; i < len(reorgInfo.elements[1:]); i++ {
// This backfill job has been exited during processing. At that time, the element is reorgInfo.elements[i+1] and handle range is [reorgInfo.StartHandle, reorgInfo.EndHandle].
// Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle].
if i == startElementOffsetToResetHandle+1 {
@ -1076,7 +1082,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column
if err != nil {
return errors.Trace(err)
}
err = w.addTableIndex(t, idxes[i], reorgInfo)
err = w.addTableIndex(t, reorgInfo)
if err != nil {
return errors.Trace(err)
}
@ -1101,7 +1107,20 @@ type updateColumnWorker struct {
jobContext *JobContext
}
func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) {
logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.String("reorgInfo", reorgInfo.String()))
return nil
}
var oldCol, newCol *model.ColumnInfo
for _, col := range t.WritableCols() {
if col.ID == reorgInfo.currElement.ID {
newCol = col.ColumnInfo
oldCol = table.FindCol(t.Cols(), getChangingColumnOriginName(newCol)).ColumnInfo
break
}
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &updateColumnWorker{
backfillWorker: newBackfillWorker(sessCtx, id, t, reorgInfo),
@ -1126,7 +1145,7 @@ type rowRecord struct {
}
// getNextKey gets next handle of entry that we are going to process.
func (w *updateColumnWorker) getNextKey(taskRange reorgBackfillTask,
func (*updateColumnWorker) getNextKey(taskRange reorgBackfillTask,
taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) {
if !taskDone {
// The task is not done. So we need to pick the last processed entry's handle and add one.
@ -1215,10 +1234,12 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra
}
if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 {
warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()
//nolint:forcetypeassert
recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error)
}
failpoint.Inject("MockReorgTimeoutInOneRegion", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
if handle.IntValue() == 3000 && atomic.CompareAndSwapInt32(&TestCheckReorgTimeout, 0, 1) {
failpoint.Return(errors.Trace(dbterror.ErrWaitReorgTimeout))
@ -1508,6 +1529,7 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc
}
}
buf.WriteString(" limit 1")
//nolint:forcetypeassert
rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, buf.String(), paramsList...)
if err != nil {
return errors.Trace(err)
@ -1656,6 +1678,7 @@ func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model.
skipCheck := false
failpoint.Inject("skipMockContextDoExec", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
skipCheck = true
}
@ -1779,3 +1802,12 @@ func getChangingIndexOriginName(changingIdx *model.IndexInfo) string {
}
return idxName[:pos]
}
func getChangingColumnOriginName(changingColumn *model.ColumnInfo) string {
columnName := strings.TrimPrefix(changingColumn.Name.O, changingColumnPrefix)
var pos int
if pos = strings.LastIndex(columnName, "_"); pos == -1 {
return columnName
}
return columnName[:pos]
}

View File

@ -15,6 +15,7 @@
package ddl
import (
"bytes"
"context"
"strings"
"sync/atomic"
@ -697,7 +698,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo
func() {
addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name)
}, false)
return w.addTableIndex(tbl, indexInfo, reorgInfo)
return w.addTableIndex(tbl, reorgInfo)
})
if err != nil {
if dbterror.ErrWaitReorgTimeout.Equal(err) {
@ -768,6 +769,7 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) {
removeIndexInfo(tblInfo, indexInfo)
failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
panic("panic test in cancelling add index")
}
@ -1008,8 +1010,19 @@ type addIndexWorker struct {
distinctCheckFlags []bool
}
func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *addIndexWorker {
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *addIndexWorker {
if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) {
logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", reorgInfo.Query),
zap.String("reorgInfo", reorgInfo.String()))
return nil
}
var index table.Index
for _, idx := range t.Indices() {
if idx.Meta().ID == reorgInfo.currElement.ID {
index = idx
break
}
}
rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap)
return &addIndexWorker{
baseIndexWorker: baseIndexWorker{
@ -1266,6 +1279,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i
// BackfillDataInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128.
func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
failpoint.Inject("errorMockPanic", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
panic("panic test")
}
@ -1328,13 +1342,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC
return
}
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error {
func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(t, typeAddIndexWorker, indexInfo, nil, nil, reorgInfo)
return w.writePhysicalTableRecord(t, typeAddIndexWorker, reorgInfo)
}
// addTableIndex handles the add index reorganization state for a table.
func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *reorgInfo) error {
func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error {
var err error
if tbl, ok := t.(table.PartitionedTable); ok {
var finish bool
@ -1343,7 +1357,7 @@ func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *r
if p == nil {
return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID)
}
err = w.addPhysicalTableIndex(p, idx, reorgInfo)
err = w.addPhysicalTableIndex(p, reorgInfo)
if err != nil {
break
}
@ -1353,7 +1367,8 @@ func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *r
}
}
} else {
err = w.addPhysicalTableIndex(t.(table.PhysicalTable), idx, reorgInfo)
//nolint:forcetypeassert
err = w.addPhysicalTableIndex(t.(table.PhysicalTable), reorgInfo)
}
return errors.Trace(err)
}
@ -1379,8 +1394,10 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo
}
failpoint.Inject("mockUpdateCachedSafePoint", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
ts := oracle.GoTimeToTS(time.Now())
//nolint:forcetypeassert
s := reorg.d.store.(tikv.Storage)
s.UpdateSPCache(ts, time.Now())
time.Sleep(time.Second * 3)
@ -1483,6 +1500,7 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT
func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
failpoint.Inject("errorMockPanic", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) {
panic("panic test")
}
@ -1529,7 +1547,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t
// cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition.
func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error {
logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String()))
return w.writePhysicalTableRecord(t, typeCleanUpIndexWorker, nil, nil, nil, reorgInfo)
return w.writePhysicalTableRecord(t, typeCleanUpIndexWorker, reorgInfo)
}
// cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions.