ddl: Batch backfilling index and column data (#1916)
This commit is contained in:
118
ddl/column.go
118
ddl/column.go
@ -170,7 +170,7 @@ func (d *ddl) onAddColumn(t *meta.Meta, job *model.Job) error {
|
||||
}
|
||||
if columnInfo.DefaultValue != nil || mysql.HasNotNullFlag(columnInfo.Flag) {
|
||||
err = d.runReorgJob(func() error {
|
||||
return d.backfillColumn(tbl, columnInfo, reorgInfo, job)
|
||||
return d.addTableColumn(tbl, columnInfo, reorgInfo, job)
|
||||
})
|
||||
if terror.ErrorEqual(err, errWaitReorgTimeout) {
|
||||
// if timeout, we should return, check for the owner and re-wait job done.
|
||||
@ -305,13 +305,13 @@ func (d *ddl) onDropColumn(t *meta.Meta, job *model.Job) error {
|
||||
// 3. For one row, if the row has been already deleted, skip to next row.
|
||||
// 4. If not deleted, check whether column data has existed, if existed, skip to next row.
|
||||
// 5. If column data doesn't exist, backfill the column with default value and then continue to handle next row.
|
||||
func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo, job *model.Job) error {
|
||||
func (d *ddl) addTableColumn(t table.Table, columnInfo *model.ColumnInfo, reorgInfo *reorgInfo, job *model.Job) error {
|
||||
seekHandle := reorgInfo.Handle
|
||||
version := reorgInfo.SnapshotVer
|
||||
count := job.GetRowCount()
|
||||
|
||||
for {
|
||||
startTS := time.Now()
|
||||
startTime := time.Now()
|
||||
handles, err := d.getSnapshotRows(t, version, seekHandle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -321,8 +321,8 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
|
||||
|
||||
count += int64(len(handles))
|
||||
seekHandle = handles[len(handles)-1] + 1
|
||||
sub := time.Since(startTS).Seconds()
|
||||
err = d.backfillColumnData(t, columnInfo, handles, reorgInfo)
|
||||
sub := time.Since(startTime).Seconds()
|
||||
err = d.backfillColumn(t, columnInfo, handles, reorgInfo)
|
||||
if err != nil {
|
||||
log.Warnf("[ddl] added column for %v rows failed, take time %v", count, sub)
|
||||
return errors.Trace(err)
|
||||
@ -334,11 +334,56 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, reorgI
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error {
|
||||
var (
|
||||
defaultVal types.Datum
|
||||
err error
|
||||
)
|
||||
// backfillColumnInTxn deals with a part of backfilling column data in a Transaction.
|
||||
// This part of the column data rows is defaultSmallBatchSize.
|
||||
func (d *ddl) backfillColumnInTxn(t table.Table, colID int64, handles []int64, colMap map[int64]*types.FieldType,
|
||||
defaultVal types.Datum, txn kv.Transaction) (int64, error) {
|
||||
nextHandle := handles[0]
|
||||
for _, handle := range handles {
|
||||
log.Debug("[ddl] backfill column...", handle)
|
||||
rowKey := t.RecordKey(handle)
|
||||
rowVal, err := txn.Get(rowKey)
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
// If row doesn't exist, skip it.
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
rowColumns, err := tablecodec.DecodeRow(rowVal, colMap)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
if _, ok := rowColumns[colID]; ok {
|
||||
// The column is already added by update or insert statement, skip it.
|
||||
continue
|
||||
}
|
||||
|
||||
newColumnIDs := make([]int64, 0, len(rowColumns)+1)
|
||||
newRow := make([]types.Datum, 0, len(rowColumns)+1)
|
||||
for colID, val := range rowColumns {
|
||||
newColumnIDs = append(newColumnIDs, colID)
|
||||
newRow = append(newRow, val)
|
||||
}
|
||||
newColumnIDs = append(newColumnIDs, colID)
|
||||
newRow = append(newRow, defaultVal)
|
||||
newRowVal, err := tablecodec.EncodeRow(newRow, newColumnIDs)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
err = txn.Set(rowKey, newRowVal)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
return nextHandle, nil
|
||||
}
|
||||
|
||||
func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, handles []int64, reorgInfo *reorgInfo) error {
|
||||
var defaultVal types.Datum
|
||||
var err error
|
||||
if columnInfo.DefaultValue != nil {
|
||||
defaultVal, _, err = table.GetColDefaultValue(nil, columnInfo)
|
||||
if err != nil {
|
||||
@ -347,55 +392,36 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha
|
||||
} else if mysql.HasNotNullFlag(columnInfo.Flag) {
|
||||
defaultVal = table.GetZeroValue(columnInfo)
|
||||
}
|
||||
|
||||
colMap := make(map[int64]*types.FieldType)
|
||||
for _, col := range t.Meta().Columns {
|
||||
colMap[col.ID] = &col.FieldType
|
||||
}
|
||||
for _, handle := range handles {
|
||||
log.Debug("[ddl] backfill column...", handle)
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
|
||||
var endIdx int
|
||||
for len(handles) > 0 {
|
||||
if len(handles) >= defaultSmallBatchSize {
|
||||
endIdx = defaultSmallBatchSize
|
||||
} else {
|
||||
endIdx = len(handles)
|
||||
}
|
||||
|
||||
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
rowKey := t.RecordKey(handle)
|
||||
rowVal, err := txn.Get(rowKey)
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
// If row doesn't exist, skip it.
|
||||
return nil
|
||||
|
||||
nextHandle, err1 := d.backfillColumnInTxn(t, columnInfo.ID, handles[:endIdx], colMap, defaultVal, txn)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
rowColumns, err := tablecodec.DecodeRow(rowVal, colMap)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if _, ok := rowColumns[columnInfo.ID]; ok {
|
||||
// The column is already added by update or insert statement, skip it.
|
||||
return nil
|
||||
}
|
||||
newColumnIDs := make([]int64, 0, len(rowColumns)+1)
|
||||
newRow := make([]types.Datum, 0, len(rowColumns)+1)
|
||||
for colID, val := range rowColumns {
|
||||
newColumnIDs = append(newColumnIDs, colID)
|
||||
newRow = append(newRow, val)
|
||||
}
|
||||
newColumnIDs = append(newColumnIDs, columnInfo.ID)
|
||||
newRow = append(newRow, defaultVal)
|
||||
newRowVal, err := tablecodec.EncodeRow(newRow, newColumnIDs)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
err = txn.Set(rowKey, newRowVal)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return errors.Trace(reorgInfo.UpdateHandle(txn, handle))
|
||||
return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
handles = handles[endIdx:]
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@ -43,6 +43,8 @@ import (
|
||||
|
||||
var _ = Suite(&testDBSuite{})
|
||||
|
||||
const defaultBatchSize = 1024
|
||||
|
||||
type testDBSuite struct {
|
||||
db *sql.DB
|
||||
|
||||
@ -123,7 +125,6 @@ func (s *testDBSuite) testAddUniqueIndexRollback(c *C) {
|
||||
// t1 (c1 int, c2 int, c3 int, primary key(c1))
|
||||
s.mustExec(c, "delete from t1")
|
||||
// defaultBatchSize is equal to ddl.defaultBatchSize
|
||||
defaultBatchSize := 1024
|
||||
base := defaultBatchSize * 2
|
||||
count := base
|
||||
// add some rows
|
||||
@ -179,7 +180,7 @@ LOOP:
|
||||
func (s *testDBSuite) testAddIndex(c *C) {
|
||||
done := make(chan struct{}, 1)
|
||||
|
||||
num := 100
|
||||
num := defaultBatchSize + 10
|
||||
// first add some rows
|
||||
for i := 0; i < num; i++ {
|
||||
s.mustExec(c, "insert into t1 values (?, ?, ?)", i, i, i)
|
||||
@ -402,7 +403,7 @@ func sessionExec(c *C, s kv.Storage, sql string) {
|
||||
func (s *testDBSuite) testAddColumn(c *C) {
|
||||
done := make(chan struct{}, 1)
|
||||
|
||||
num := 100
|
||||
num := defaultBatchSize + 10
|
||||
// add some rows
|
||||
for i := 0; i < num; i++ {
|
||||
s.mustExec(c, "insert into t2 values (?, ?, ?)", i, i, i)
|
||||
|
||||
92
ddl/index.go
92
ddl/index.go
@ -372,6 +372,7 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo
|
||||
}
|
||||
|
||||
const defaultBatchSize = 1024
|
||||
const defaultSmallBatchSize = 128
|
||||
|
||||
// How to add index in reorganization state?
|
||||
// 1. Generate a snapshot with special version.
|
||||
@ -385,7 +386,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
|
||||
count := job.GetRowCount()
|
||||
|
||||
for {
|
||||
startTS := time.Now()
|
||||
startTime := time.Now()
|
||||
handles, err := d.getSnapshotRows(t, version, seekHandle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
@ -396,7 +397,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
|
||||
count += int64(len(handles))
|
||||
seekHandle = handles[len(handles)-1] + 1
|
||||
err = d.backfillTableIndex(t, indexInfo, handles, reorgInfo)
|
||||
sub := time.Since(startTS).Seconds()
|
||||
sub := time.Since(startTime).Seconds()
|
||||
if err != nil {
|
||||
log.Warnf("[ddl] added index for %v rows failed, take time %v", count, sub)
|
||||
return errors.Trace(err)
|
||||
@ -405,7 +406,6 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
|
||||
job.SetRowCount(count)
|
||||
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
|
||||
log.Infof("[ddl] added index for %v rows, take time %v", count, sub)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -452,51 +452,71 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (
|
||||
return handles, nil
|
||||
}
|
||||
|
||||
func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error {
|
||||
kvX := tables.NewIndex(t.Meta(), indexInfo)
|
||||
|
||||
// backfillIndexInTxn deals with a part of backfilling index data in a Transaction.
|
||||
// This part of the index data rows is defaultSmallBatchSize.
|
||||
func (d *ddl) backfillIndexInTxn(t table.Table, kvIdx table.Index, handles []int64, txn kv.Transaction) (int64, error) {
|
||||
nextHandle := handles[0]
|
||||
for _, handle := range handles {
|
||||
log.Debug("[ddl] backfill index...", handle)
|
||||
rowKey, vals, err := fetchRowColVals(txn, t, handle, kvIdx.Meta())
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
// Row doesn't exist, skip it.
|
||||
nextHandle = handle
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
exist, _, err := kvIdx.Exist(txn, vals, handle)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
} else if exist {
|
||||
// Index already exists, skip it.
|
||||
nextHandle = handle
|
||||
continue
|
||||
}
|
||||
err = txn.LockKeys(rowKey)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
// Create the index.
|
||||
_, err = kvIdx.Create(txn, vals, handle)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
nextHandle = handle
|
||||
}
|
||||
return nextHandle, nil
|
||||
}
|
||||
|
||||
func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error {
|
||||
var endIdx int
|
||||
kvIdx := tables.NewIndex(t.Meta(), indexInfo)
|
||||
for len(handles) > 0 {
|
||||
if len(handles) >= defaultSmallBatchSize {
|
||||
endIdx = defaultSmallBatchSize
|
||||
} else {
|
||||
endIdx = len(handles)
|
||||
}
|
||||
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isReorgRunnable(txn, ddlJobFlag); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
rowKey, vals, err1 := fetchRowColVals(txn, t, handle, indexInfo)
|
||||
if terror.ErrorEqual(err1, kv.ErrNotExist) {
|
||||
// row doesn't exist, skip it.
|
||||
return nil
|
||||
if err1 := d.isReorgRunnable(txn, ddlJobFlag); err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
}
|
||||
nextHandle, err1 := d.backfillIndexInTxn(t, kvIdx, handles[:endIdx], txn)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
}
|
||||
|
||||
exist, _, err1 := kvX.Exist(txn, vals, handle)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
} else if exist {
|
||||
// index already exists, skip it.
|
||||
return nil
|
||||
}
|
||||
err1 = txn.LockKeys(rowKey)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
}
|
||||
|
||||
// create the index.
|
||||
_, err1 = kvX.Create(txn, vals, handle)
|
||||
if err1 != nil {
|
||||
return errors.Trace(err1)
|
||||
}
|
||||
|
||||
// update reorg next handle
|
||||
return errors.Trace(reorgInfo.UpdateHandle(txn, handle))
|
||||
// Update reorg next handle.
|
||||
return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
handles = handles[endIdx:]
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user