From 62acece63c19339278f1cefc1606684981fd86d3 Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Mon, 15 Aug 2022 10:12:50 +0800 Subject: [PATCH] ddl: refine param for backfillWorker (#37070) --- build/nogo_config.json | 9 +++++++ ddl/backfilling.go | 15 +++++------ ddl/column.go | 60 ++++++++++++++++++++++++++++++++---------- ddl/index.go | 36 ++++++++++++++++++------- 4 files changed, 89 insertions(+), 31 deletions(-) diff --git a/build/nogo_config.json b/build/nogo_config.json index 079077eb41..de21c62631 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -154,6 +154,9 @@ ".*_generated\\.go$": "ignore generated code" }, "only_files": { + "ddl/backfilling.go": "ddl/backfilling.go", + "ddl/column.go": "ddl/column.go", + "ddl/index.go": "ddl/index.go", "server/conn.go": "server/conn.go", "server/conn_stmt.go": "server/conn_stmt.go", "server/conn_test.go": "server/conn_test.go", @@ -283,6 +286,9 @@ ".*_generated\\.go$": "ignore generated code" }, "only_files": { + "ddl/backfilling.go": "ddl/backfilling.go", + "ddl/column.go": "ddl/column.go", + "ddl/index.go": "ddl/index.go", "expression/builtin_cast.go": "expression/builtin_cast code", "server/conn.go": "server/conn.go", "server/conn_stmt.go": "server/conn_stmt.go", @@ -638,6 +644,9 @@ ".*_generated\\.go$": "ignore generated code" }, "only_files": { + "ddl/backfilling.go": "ddl/backfilling.go", + "ddl/column.go": "ddl/column.go", + "ddl/index.go": "ddl/index.go", "expression/builtin_cast.go": "enable expression/builtin_cast.go", "planner/core/plan.go": "planner/core/plan.go", "server/conn.go": "server/conn.go", diff --git a/ddl/backfilling.go b/ddl/backfilling.go index f45a75228c..72a3135c2f 100644 --- a/ddl/backfilling.go +++ b/ddl/backfilling.go @@ -164,7 +164,7 @@ func newBackfillWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable sessCtx: sessCtx, taskCh: make(chan *reorgBackfillTask, 1), resultCh: make(chan *backfillResult, 1), - priority: kv.PriorityLow, + priority: reorgInfo.Job.Priority, } } @@ -365,7 +365,7 @@ func splitTableRanges(t table.PhysicalTable, store kv.Storage, startKey, endKey return ranges, nil } -func (w *worker) waitTaskResults(workers []*backfillWorker, taskCnt int, +func (*worker) waitTaskResults(workers []*backfillWorker, taskCnt int, totalAddedCount *int64, startKey kv.Key) (kv.Key, int64, error) { var ( addedCount int64 @@ -585,7 +585,7 @@ func setSessCtxLocation(sctx sessionctx.Context, info *reorgInfo) error { // // The above operations are completed in a transaction. // Finally, update the concurrent processing of the total number of rows, and store the completed handle value. -func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, indexInfo *model.IndexInfo, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error { +func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType backfillWorkerType, reorgInfo *reorgInfo) error { job := reorgInfo.Job totalAddedCount := job.GetRowCount() @@ -604,6 +604,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba } failpoint.Inject("MockCaseWhenParseFailure", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { failpoint.Return(errors.New("job.ErrCount:" + strconv.Itoa(int(job.ErrorCount)) + ", mock unknown type: ast.whenClause.")) } @@ -656,20 +657,17 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba switch bfWorkerType { case typeAddIndexWorker: - idxWorker := newAddIndexWorker(sessCtx, i, t, indexInfo, decodeColMap, reorgInfo, jc) - idxWorker.priority = job.Priority + idxWorker := newAddIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc) backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker) go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job) case typeUpdateColumnWorker: // Setting InCreateOrAlterStmt tells the difference between SELECT casting and ALTER COLUMN casting. sessCtx.GetSessionVars().StmtCtx.InCreateOrAlterStmt = true - updateWorker := newUpdateColumnWorker(sessCtx, i, t, oldColInfo, colInfo, decodeColMap, reorgInfo, jc) - updateWorker.priority = job.Priority + updateWorker := newUpdateColumnWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc) backfillWorkers = append(backfillWorkers, updateWorker.backfillWorker) go updateWorker.backfillWorker.run(reorgInfo.d, updateWorker, job) case typeCleanUpIndexWorker: idxWorker := newCleanUpIndexWorker(sessCtx, i, t, decodeColMap, reorgInfo, jc) - idxWorker.priority = job.Priority backfillWorkers = append(backfillWorkers, idxWorker.backfillWorker) go idxWorker.backfillWorker.run(reorgInfo.d, idxWorker, job) default: @@ -684,6 +682,7 @@ func (w *worker) writePhysicalTableRecord(t table.PhysicalTable, bfWorkerType ba } failpoint.Inject("checkBackfillWorkerNum", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { num := int(atomic.LoadInt32(&TestCheckWorkerNumber)) if num != 0 { diff --git a/ddl/column.go b/ddl/column.go index 595d71ec34..e3ccf6ed9b 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -111,6 +111,7 @@ func onAddColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) } failpoint.Inject("errorBeforeDecodeArgs", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { failpoint.Return(ver, errors.New("occur an error before decode args")) } @@ -205,7 +206,7 @@ func setIndicesState(indexInfos []*model.IndexInfo, state model.SchemaState) { } } -func checkDropColumnForStatePublic(tblInfo *model.TableInfo, colInfo *model.ColumnInfo) (err error) { +func checkDropColumnForStatePublic(colInfo *model.ColumnInfo) (err error) { // When the dropping column has not-null flag and it hasn't the default value, we can backfill the column value like "add column". // NOTE: If the state of StateWriteOnly can be rollbacked, we'd better reconsider the original default value. // And we need consider the column without not-null flag. @@ -250,7 +251,7 @@ func onDropColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) colInfo.State = model.StateWriteOnly setIndicesState(idxInfos, model.StateWriteOnly) tblInfo.MoveColumnInfo(colInfo.Offset, len(tblInfo.Columns)-1) - err = checkDropColumnForStatePublic(tblInfo, colInfo) + err = checkDropColumnForStatePublic(colInfo) if err != nil { return ver, errors.Trace(err) } @@ -521,6 +522,7 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in } failpoint.Inject("uninitializedOffsetAndState", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { if modifyInfo.newCol.State != model.StatePublic { failpoint.Return(ver, errors.New("the column state is wrong")) @@ -690,6 +692,7 @@ func (w *worker) doModifyColumnTypeWithData( defer w.sessPool.put(sctx) ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL) + //nolint:forcetypeassert _, _, err = sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, valStr) if err != nil { job.State = model.JobStateCancelled @@ -815,7 +818,7 @@ func doReorgWorkForModifyColumn(w *worker, d *ddlCtx, t *meta.Meta, job *model.J // Use old column name to generate less confusing error messages. changingColCpy := changingCol.Clone() changingColCpy.Name = oldCol.Name - return w.updateColumnAndIndexes(tbl, oldCol, changingColCpy, changingIdxs, reorgInfo) + return w.updateCurrentElement(tbl, reorgInfo) }) if err != nil { if dbterror.ErrWaitReorgTimeout.Equal(err) { @@ -999,17 +1002,18 @@ func BuildElements(changingCol *model.ColumnInfo, changingIdxs []*model.IndexInf return elements } -func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, oldColInfo, colInfo *model.ColumnInfo, reorgInfo *reorgInfo) error { +func (w *worker) updatePhysicalTableRow(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to update table row", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, nil, oldColInfo, colInfo, reorgInfo) + return w.writePhysicalTableRecord(t, typeUpdateColumnWorker, reorgInfo) } // TestReorgGoroutineRunning is only used in test to indicate the reorg goroutine has been started. var TestReorgGoroutineRunning = make(chan interface{}) -// updateColumnAndIndexes handles the modify column reorganization state for a table. -func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.ColumnInfo, idxes []*model.IndexInfo, reorgInfo *reorgInfo) error { +// updateCurrentElement update the current element for reorgInfo. +func (w *worker) updateCurrentElement(t table.Table, reorgInfo *reorgInfo) error { failpoint.Inject("mockInfiniteReorgLogic", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { a := new(interface{}) TestReorgGoroutineRunning <- a @@ -1024,7 +1028,8 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column }) // TODO: Support partition tables. if bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { - err := w.updatePhysicalTableRow(t.(table.PhysicalTable), oldCol, col, reorgInfo) + //nolint:forcetypeassert + err := w.updatePhysicalTableRow(t.(table.PhysicalTable), reorgInfo) if err != nil { return errors.Trace(err) } @@ -1035,6 +1040,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column if err != nil { return errors.Trace(err) } + //nolint:forcetypeassert originalStartHandle, originalEndHandle, err := getTableRange(reorgInfo.d.jobContext(reorgInfo.Job), reorgInfo.d, t.(table.PhysicalTable), currentVer.Ver, reorgInfo.Job.Priority) if err != nil { return errors.Trace(err) @@ -1044,8 +1050,8 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column startElementOffsetToResetHandle := -1 // This backfill job starts with backfilling index data, whose index ID is currElement.ID. if bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { - for i, idx := range idxes { - if reorgInfo.currElement.ID == idx.ID { + for i, element := range reorgInfo.elements[1:] { + if reorgInfo.currElement.ID == element.ID { startElementOffset = i startElementOffsetToResetHandle = i break @@ -1053,7 +1059,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column } } - for i := startElementOffset; i < len(idxes); i++ { + for i := startElementOffset; i < len(reorgInfo.elements[1:]); i++ { // This backfill job has been exited during processing. At that time, the element is reorgInfo.elements[i+1] and handle range is [reorgInfo.StartHandle, reorgInfo.EndHandle]. // Then the handle range of the rest elements' is [originalStartHandle, originalEndHandle]. if i == startElementOffsetToResetHandle+1 { @@ -1076,7 +1082,7 @@ func (w *worker) updateColumnAndIndexes(t table.Table, oldCol, col *model.Column if err != nil { return errors.Trace(err) } - err = w.addTableIndex(t, idxes[i], reorgInfo) + err = w.addTableIndex(t, reorgInfo) if err != nil { return errors.Trace(err) } @@ -1101,7 +1107,20 @@ type updateColumnWorker struct { jobContext *JobContext } -func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, oldCol, newCol *model.ColumnInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { +func newUpdateColumnWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *updateColumnWorker { + if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.ColumnElementKey) { + logutil.BgLogger().Error("Element type for updateColumnWorker incorrect", zap.String("jobQuery", reorgInfo.Query), + zap.String("reorgInfo", reorgInfo.String())) + return nil + } + var oldCol, newCol *model.ColumnInfo + for _, col := range t.WritableCols() { + if col.ID == reorgInfo.currElement.ID { + newCol = col.ColumnInfo + oldCol = table.FindCol(t.Cols(), getChangingColumnOriginName(newCol)).ColumnInfo + break + } + } rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &updateColumnWorker{ backfillWorker: newBackfillWorker(sessCtx, id, t, reorgInfo), @@ -1126,7 +1145,7 @@ type rowRecord struct { } // getNextKey gets next handle of entry that we are going to process. -func (w *updateColumnWorker) getNextKey(taskRange reorgBackfillTask, +func (*updateColumnWorker) getNextKey(taskRange reorgBackfillTask, taskDone bool, lastAccessedHandle kv.Key) (nextHandle kv.Key) { if !taskDone { // The task is not done. So we need to pick the last processed entry's handle and add one. @@ -1215,10 +1234,12 @@ func (w *updateColumnWorker) getRowRecord(handle kv.Handle, recordKey []byte, ra } if w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() != nil && len(w.sessCtx.GetSessionVars().StmtCtx.GetWarnings()) != 0 { warn := w.sessCtx.GetSessionVars().StmtCtx.GetWarnings() + //nolint:forcetypeassert recordWarning = errors.Cause(w.reformatErrors(warn[0].Err)).(*terror.Error) } failpoint.Inject("MockReorgTimeoutInOneRegion", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { if handle.IntValue() == 3000 && atomic.CompareAndSwapInt32(&TestCheckReorgTimeout, 0, 1) { failpoint.Return(errors.Trace(dbterror.ErrWaitReorgTimeout)) @@ -1508,6 +1529,7 @@ func checkForNullValue(ctx context.Context, sctx sessionctx.Context, isDataTrunc } } buf.WriteString(" limit 1") + //nolint:forcetypeassert rows, _, err := sctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, nil, buf.String(), paramsList...) if err != nil { return errors.Trace(err) @@ -1656,6 +1678,7 @@ func modifyColsFromNull2NotNull(w *worker, dbInfo *model.DBInfo, tblInfo *model. skipCheck := false failpoint.Inject("skipMockContextDoExec", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { skipCheck = true } @@ -1779,3 +1802,12 @@ func getChangingIndexOriginName(changingIdx *model.IndexInfo) string { } return idxName[:pos] } + +func getChangingColumnOriginName(changingColumn *model.ColumnInfo) string { + columnName := strings.TrimPrefix(changingColumn.Name.O, changingColumnPrefix) + var pos int + if pos = strings.LastIndex(columnName, "_"); pos == -1 { + return columnName + } + return columnName[:pos] +} diff --git a/ddl/index.go b/ddl/index.go index d2426c05d4..85b860ac4e 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -15,6 +15,7 @@ package ddl import ( + "bytes" "context" "strings" "sync/atomic" @@ -697,7 +698,7 @@ func doReorgWorkForCreateIndex(w *worker, d *ddlCtx, t *meta.Meta, job *model.Jo func() { addIndexErr = dbterror.ErrCancelledDDLJob.GenWithStack("add table `%v` index `%v` panic", tbl.Meta().Name, indexInfo.Name) }, false) - return w.addTableIndex(tbl, indexInfo, reorgInfo) + return w.addTableIndex(tbl, reorgInfo) }) if err != nil { if dbterror.ErrWaitReorgTimeout.Equal(err) { @@ -768,6 +769,7 @@ func onDropIndex(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { removeIndexInfo(tblInfo, indexInfo) failpoint.Inject("mockExceedErrorLimit", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { panic("panic test in cancelling add index") } @@ -1008,8 +1010,19 @@ type addIndexWorker struct { distinctCheckFlags []bool } -func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, indexInfo *model.IndexInfo, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *addIndexWorker { - index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo) +func newAddIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, decodeColMap map[int64]decoder.Column, reorgInfo *reorgInfo, jc *JobContext) *addIndexWorker { + if !bytes.Equal(reorgInfo.currElement.TypeKey, meta.IndexElementKey) { + logutil.BgLogger().Error("Element type for addIndexWorker incorrect", zap.String("jobQuery", reorgInfo.Query), + zap.String("reorgInfo", reorgInfo.String())) + return nil + } + var index table.Index + for _, idx := range t.Indices() { + if idx.Meta().ID == reorgInfo.currElement.ID { + index = idx + break + } + } rowDecoder := decoder.NewRowDecoder(t, t.WritableCols(), decodeColMap) return &addIndexWorker{ baseIndexWorker: baseIndexWorker{ @@ -1266,6 +1279,7 @@ func (w *addIndexWorker) batchCheckUniqueKey(txn kv.Transaction, idxRecords []*i // BackfillDataInTxn will add w.batchCnt indices once, default value of w.batchCnt is 128. func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { failpoint.Inject("errorMockPanic", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { panic("panic test") } @@ -1328,13 +1342,13 @@ func (w *addIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskC return } -func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, indexInfo *model.IndexInfo, reorgInfo *reorgInfo) error { +func (w *worker) addPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to add table index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeAddIndexWorker, indexInfo, nil, nil, reorgInfo) + return w.writePhysicalTableRecord(t, typeAddIndexWorker, reorgInfo) } // addTableIndex handles the add index reorganization state for a table. -func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *reorgInfo) error { +func (w *worker) addTableIndex(t table.Table, reorgInfo *reorgInfo) error { var err error if tbl, ok := t.(table.PartitionedTable); ok { var finish bool @@ -1343,7 +1357,7 @@ func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *r if p == nil { return dbterror.ErrCancelledDDLJob.GenWithStack("Can not find partition id %d for table %d", reorgInfo.PhysicalTableID, t.Meta().ID) } - err = w.addPhysicalTableIndex(p, idx, reorgInfo) + err = w.addPhysicalTableIndex(p, reorgInfo) if err != nil { break } @@ -1353,7 +1367,8 @@ func (w *worker) addTableIndex(t table.Table, idx *model.IndexInfo, reorgInfo *r } } } else { - err = w.addPhysicalTableIndex(t.(table.PhysicalTable), idx, reorgInfo) + //nolint:forcetypeassert + err = w.addPhysicalTableIndex(t.(table.PhysicalTable), reorgInfo) } return errors.Trace(err) } @@ -1379,8 +1394,10 @@ func (w *worker) updateReorgInfo(t table.PartitionedTable, reorg *reorgInfo) (bo } failpoint.Inject("mockUpdateCachedSafePoint", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { ts := oracle.GoTimeToTS(time.Now()) + //nolint:forcetypeassert s := reorg.d.store.(tikv.Storage) s.UpdateSPCache(ts, time.Now()) time.Sleep(time.Second * 3) @@ -1483,6 +1500,7 @@ func newCleanUpIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalT func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) { failpoint.Inject("errorMockPanic", func(val failpoint.Value) { + //nolint:forcetypeassert if val.(bool) { panic("panic test") } @@ -1529,7 +1547,7 @@ func (w *cleanUpIndexWorker) BackfillDataInTxn(handleRange reorgBackfillTask) (t // cleanupPhysicalTableIndex handles the drop partition reorganization state for a non-partitioned table or a partition. func (w *worker) cleanupPhysicalTableIndex(t table.PhysicalTable, reorgInfo *reorgInfo) error { logutil.BgLogger().Info("[ddl] start to clean up index", zap.String("job", reorgInfo.Job.String()), zap.String("reorgInfo", reorgInfo.String())) - return w.writePhysicalTableRecord(t, typeCleanUpIndexWorker, nil, nil, nil, reorgInfo) + return w.writePhysicalTableRecord(t, typeCleanUpIndexWorker, reorgInfo) } // cleanupGlobalIndex handles the drop partition reorganization state to clean up index entries of partitions.