diff --git a/ddl/column.go b/ddl/column.go index 5ec5b74440..c00dd4ff53 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -170,7 +170,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error { } if columnInfo.DefaultValue != nil || mysql.HasNotNullFlag(columnInfo.Flag) { err = d.runReorgJob(func() error { - return d.backfillColumn(tbl, columnInfo, reorgInfo, job) + return d.addTableColumn(tbl, columnInfo, reorgInfo, job) }) if terror.ErrorEqual(err, errWaitReorgTimeout) { // if timeout, we should return, check for the owner and re-wait job done. @@ -305,13 +305,13 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error { // 3. For one row, if the row has been already deleted, skip to next row. // 4. If not deleted, check whether column data has existed, if existed, skip to next row. // 5. If column data doesn't exist, backfill the column with default value and then continue to handle next row. -func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo, job *model.Job) error { +func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo, job *model.Job) error { seekHandle := reorgInfo.Handle version := reorgInfo.SnapshotVer count := job.GetRowCount() for { - startTS := time.Now() + startTime := time.Now() handles, err := d.getSnapshotRows(t, version, seekHandle) if err != nil { return errors.Trace(err) @@ -321,8 +321,8 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI count += int64(len(handles)) seekHandle = handles[len(handles)-1] + 1 - sub := time.Since(startTS).Seconds() - err = d.backfillColumnData(t, columnInfo, handles, reorgInfo) + sub := time.Since(startTime).Seconds() + err = d.backfillColumn(t, columnInfo, handles, reorgInfo) if err != nil { log.Warnf("[ddl] added column for %v rows failed, take time %v", count, sub) return errors.Trace(err) @@ -334,11 +334,56 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI } } -func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error { - var ( - defaultVal types.Datum - err error - ) +// backfillColumnInTxn deals with a part of backfilling column data in a Transaction. +// This part of the column data rows is defaultSmallBatchSize. +func (d *ddl) backfillColumnInTxn(t table.Table, colID int64, handles []int64, colMap map[int64]*types.FieldType, + defaultVal types.Datum, txn kv.Transaction) (int64, error) { + nextHandle := handles[0] + for _, handle := range handles { + log.Debug("[ddl] backfill column...", handle) + rowKey := t.RecordKey(handle) + rowVal, err := txn.Get(rowKey) + if terror.ErrorEqual(err, kv.ErrNotExist) { + // If row doesn't exist, skip it. + continue + } + if err != nil { + return 0, errors.Trace(err) + } + + rowColumns, err := tablecodec.DecodeRow(rowVal, colMap) + if err != nil { + return 0, errors.Trace(err) + } + if _, ok := rowColumns[colID]; ok { + // The column is already added by update or insert statement, skip it. + continue + } + + newColumnIDs := make([]int64, 0, len(rowColumns)+1) + newRow := make([]types.Datum, 0, len(rowColumns)+1) + for colID, val := range rowColumns { + newColumnIDs = append(newColumnIDs, colID) + newRow = append(newRow, val) + } + newColumnIDs = append(newColumnIDs, colID) + newRow = append(newRow, defaultVal) + newRowVal, err := tablecodec.EncodeRow(newRow, newColumnIDs) + if err != nil { + return 0, errors.Trace(err) + } + err = txn.Set(rowKey, newRowVal) + if err != nil { + return 0, errors.Trace(err) + } + } + + return nextHandle, nil +} + +func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error { + var defaultVal types.Datum + var err error if columnInfo.DefaultValue != nil { defaultVal, _, err = table.GetColDefaultValue(nil, columnInfo) if err != nil { @@ -347,55 +392,36 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha } else if mysql.HasNotNullFlag(columnInfo.Flag) { defaultVal = table.GetZeroValue(columnInfo) } + colMap := make(map[int64]*types.FieldType) for _, col := range t.Meta().Columns { colMap[col.ID] = &col.FieldType } - for _, handle := range handles { - log.Debug("[ddl] backfill column...", handle) - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + + var endIdx int + for len(handles) > 0 { + if len(handles) >= defaultSmallBatchSize { + endIdx = defaultSmallBatchSize + } else { + endIdx = len(handles) + } + + err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil { return errors.Trace(err) } - rowKey := t.RecordKey(handle) - rowVal, err := txn.Get(rowKey) - if terror.ErrorEqual(err, kv.ErrNotExist) { - // If row doesn't exist, skip it. - return nil + + nextHandle, err1 := d.backfillColumnInTxn(t, columnInfo.ID, handles[:endIdx], colMap, defaultVal, txn) + if err1 != nil { + return errors.Trace(err1) } - if err != nil { - return errors.Trace(err) - } - rowColumns, err := tablecodec.DecodeRow(rowVal, colMap) - if err != nil { - return errors.Trace(err) - } - if _, ok := rowColumns[columnInfo.ID]; ok { - // The column is already added by update or insert statement, skip it. - return nil - } - newColumnIDs := make([]int64, 0, len(rowColumns)+1) - newRow := make([]types.Datum, 0, len(rowColumns)+1) - for colID, val := range rowColumns { - newColumnIDs = append(newColumnIDs, colID) - newRow = append(newRow, val) - } - newColumnIDs = append(newColumnIDs, columnInfo.ID) - newRow = append(newRow, defaultVal) - newRowVal, err := tablecodec.EncodeRow(newRow, newColumnIDs) - if err != nil { - return errors.Trace(err) - } - err = txn.Set(rowKey, newRowVal) - if err != nil { - return errors.Trace(err) - } - return errors.Trace(reorgInfo.UpdateHandle(txn, handle)) + return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle)) }) if err != nil { return errors.Trace(err) } + handles = handles[endIdx:] } return nil diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 525d667943..a999f90328 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -43,6 +43,8 @@ import ( var _ = Suite(&testDBSuite{}) +const defaultBatchSize = 1024 + type testDBSuite struct { db *sql.DB @@ -123,7 +125,6 @@ func (s *testDBSuite) testAddUniqueIndexRollback(c *C) { // t1 (c1 int, c2 int, c3 int, primary key(c1)) s.mustExec(c, "delete from t1") // defaultBatchSize is equal to ddl.defaultBatchSize - defaultBatchSize := 1024 base := defaultBatchSize * 2 count := base // add some rows @@ -179,7 +180,7 @@ LOOP: func (s *testDBSuite) testAddIndex(c *C) { done := make(chan struct{}, 1) - num := 100 + num := defaultBatchSize + 10 // first add some rows for i := 0; i < num; i++ { s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i) @@ -402,7 +403,7 @@ func sessionExec(c *C, s kv.Storage, sql string) { func (s *testDBSuite) testAddColumn(c *C) { done := make(chan struct{}, 1) - num := 100 + num := defaultBatchSize + 10 // add some rows for i := 0; i < num; i++ { s.mustExec(c, "insert into t2 values (?, ?, ?)", i, i, i) diff --git a/ddl/index.go b/ddl/index.go index c15c71746d..9f4e7cbb67 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -372,6 +372,7 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo } const defaultBatchSize = 1024 +const defaultSmallBatchSize = 128 // How to add index in reorganization state? // 1. Generate a snapshot with special version. @@ -385,7 +386,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo count := job.GetRowCount() for { - startTS := time.Now() + startTime := time.Now() handles, err := d.getSnapshotRows(t, version, seekHandle) if err != nil { return errors.Trace(err) @@ -396,7 +397,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo count += int64(len(handles)) seekHandle = handles[len(handles)-1] + 1 err = d.backfillTableIndex(t, indexInfo, handles, reorgInfo) - sub := time.Since(startTS).Seconds() + sub := time.Since(startTime).Seconds() if err != nil { log.Warnf("[ddl] added index for %v rows failed, take time %v", count, sub) return errors.Trace(err) @@ -405,7 +406,6 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo job.SetRowCount(count) batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub) log.Infof("[ddl] added index for %v rows, take time %v", count, sub) - } } @@ -452,51 +452,71 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) ( return handles, nil } -func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error { - kvX := tables.NewIndex(t.Meta(), indexInfo) - +// backfillIndexInTxn deals with a part of backfilling index data in a Transaction. +// This part of the index data rows is defaultSmallBatchSize. +func (d *ddl) backfillIndexInTxn(t table.Table, kvIdx table.Index, handles []int64, txn kv.Transaction) (int64, error) { + nextHandle := handles[0] for _, handle := range handles { log.Debug("[ddl] backfill index...", handle) + rowKey, vals, err := fetchRowColVals(txn, t, handle, kvIdx.Meta()) + if terror.ErrorEqual(err, kv.ErrNotExist) { + // Row doesn't exist, skip it. + nextHandle = handle + continue + } + if err != nil { + return 0, errors.Trace(err) + } + + exist, _, err := kvIdx.Exist(txn, vals, handle) + if err != nil { + return 0, errors.Trace(err) + } else if exist { + // Index already exists, skip it. + nextHandle = handle + continue + } + err = txn.LockKeys(rowKey) + if err != nil { + return 0, errors.Trace(err) + } + + // Create the index. + _, err = kvIdx.Create(txn, vals, handle) + if err != nil { + return 0, errors.Trace(err) + } + nextHandle = handle + } + return nextHandle, nil +} + +func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error { + var endIdx int + kvIdx := tables.NewIndex(t.Meta(), indexInfo) + for len(handles) > 0 { + if len(handles) >= defaultSmallBatchSize { + endIdx = defaultSmallBatchSize + } else { + endIdx = len(handles) + } err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil { - return errors.Trace(err) - } - - rowKey, vals, err1 := fetchRowColVals(txn, t, handle, indexInfo) - if terror.ErrorEqual(err1, kv.ErrNotExist) { - // row doesn't exist, skip it. - return nil + if err1 := d.isReorgRunnable(txn, ddlJobFlag); err1 != nil { + return errors.Trace(err1) } + nextHandle, err1 := d.backfillIndexInTxn(t, kvIdx, handles[:endIdx], txn) if err1 != nil { return errors.Trace(err1) } - - exist, _, err1 := kvX.Exist(txn, vals, handle) - if err1 != nil { - return errors.Trace(err1) - } else if exist { - // index already exists, skip it. - return nil - } - err1 = txn.LockKeys(rowKey) - if err1 != nil { - return errors.Trace(err1) - } - - // create the index. - _, err1 = kvX.Create(txn, vals, handle) - if err1 != nil { - return errors.Trace(err1) - } - - // update reorg next handle - return errors.Trace(reorgInfo.UpdateHandle(txn, handle)) + // Update reorg next handle. + return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle)) }) - if err != nil { return errors.Trace(err) } + + handles = handles[endIdx:] } return nil