ddl: Speed up adding index phase (#2341)

This commit is contained in:
Lynn
2017-01-04 12:32:58 +08:00
committed by Han Fei
parent 8a03344d05
commit cc5fcae0a3

View File

@ -14,6 +14,8 @@
package ddl
import (
"sort"
"sync"
"time"
"github.com/juju/errors"
@ -324,50 +326,115 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
return errors.Trace(err)
}
func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, batchOpInfo *indexBatchOpInfo, seekHandle int64) error {
cols := t.Cols()
idxInfo := batchOpInfo.tblIndex.Meta()
err := d.iterateSnapshotRows(t, txn.StartTS(), seekHandle,
func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, taskOpInfo *indexTaskOpInfo, handleInfo *handleInfo) (
[]*indexRecord, *taskResult) {
handleCnt := defaultTaskHandleCnt
rawRecords := make([][]byte, 0, handleCnt)
idxRecords := make([]*indexRecord, 0, handleCnt)
ret := &taskResult{doneHandle: handleInfo.startHandle}
err := d.iterateSnapshotRows(t, txn.StartTS(), handleInfo.startHandle,
func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) {
rowMap, err := tablecodec.DecodeRow(rawRecord, batchOpInfo.colMap)
if err != nil {
return false, errors.Trace(err)
}
idxVal := make([]types.Datum, 0, len(idxInfo.Columns))
for _, v := range idxInfo.Columns {
col := cols[v.Offset]
idxVal = append(idxVal, rowMap[col.ID])
}
indexRecord := &indexRecord{handle: h, key: rowKey, vals: idxVal}
batchOpInfo.idxRecords = append(batchOpInfo.idxRecords, indexRecord)
if len(batchOpInfo.idxRecords) == defaultSmallBatchCnt {
rawRecords = append(rawRecords, rawRecord)
indexRecord := &indexRecord{handle: h, key: rowKey}
idxRecords = append(idxRecords, indexRecord)
if len(idxRecords) == handleCnt || handleInfo.isFinished(h) {
return false, nil
}
return true, nil
})
if err != nil {
return errors.Trace(err)
} else if len(batchOpInfo.idxRecords) == 0 {
return nil
ret.err = errors.Trace(err)
return nil, ret
}
count := len(batchOpInfo.idxRecords)
batchOpInfo.addedCount += int64(count)
batchOpInfo.handle = batchOpInfo.idxRecords[count-1].handle
return nil
ret.count = len(idxRecords)
if ret.count > 0 {
ret.doneHandle = idxRecords[ret.count-1].handle
}
// Be sure to do this operation only once.
if !handleInfo.isSent {
// Notice to start the next task operation.
taskOpInfo.nextCh <- ret.doneHandle
// Record the last handle.
// Ensure that the handle scope of the task doesn't change,
// even if the transaction retries it can't effect the other tasks.
handleInfo.endHandle = ret.doneHandle
handleInfo.isSent = true
}
if ret.count == 0 {
return nil, ret
}
cols := t.Cols()
idxInfo := taskOpInfo.tblIndex.Meta()
for i, idxRecord := range idxRecords {
rowMap, err := tablecodec.DecodeRow(rawRecords[i], taskOpInfo.colMap)
if err != nil {
ret.err = errors.Trace(err)
return nil, ret
}
idxVal := make([]types.Datum, 0, len(idxInfo.Columns))
for _, v := range idxInfo.Columns {
col := cols[v.Offset]
idxVal = append(idxVal, rowMap[col.ID])
}
idxRecord.vals = idxVal
}
return idxRecords, ret
}
const defaultBatchCnt = 1024
const defaultSmallBatchCnt = 128
const (
defaultBatchCnt = 1024
defaultSmallBatchCnt = 128
defaultTaskHandleCnt = 128
defaultTaskCnt = 16
)
// taskResult is the result of the task.
type taskResult struct {
count int // The number of records that has been processed in the task.
doneHandle int64 // This is the last reorg handle that has been processed.
err error
}
type taskRetSlice []*taskResult
func (b taskRetSlice) Len() int { return len(b) }
func (b taskRetSlice) Less(i, j int) bool { return b[i].doneHandle < b[j].doneHandle }
func (b taskRetSlice) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
// indexRecord is the record information of an index.
type indexRecord struct {
handle int64
key []byte // It's used to lock a record. Record it to reduce the encoding time.
vals []types.Datum // It's the index values.
}
// indexTaskOpInfo records the information that is needed in the task.
type indexTaskOpInfo struct {
tblIndex table.Index
colMap map[int64]*types.FieldType // It's the index columns map.
taskRetCh chan *taskResult // Get the results of all tasks.
nextCh chan int64 // It notifies to start the next task.
}
// How to add index in reorganization state?
// 1. Generate a snapshot with special version.
// 2. Traverse the snapshot, get every row in the table.
// 3. For one row, if the row has been already deleted, skip to next row.
// 4. If not deleted, check whether index has existed, if existed, skip to next row.
// 5. If index doesn't exist, create the index and then continue to handle next row.
// Concurrently process the defaultTaskHandleCnt tasks. Each task deals with a handle range of the index record.
// The handle range size is defaultTaskHandleCnt.
// Because each handle range depends on the previous one, it's necessary to obtain the handle range serially.
// Real concurrent processing needs to perform after the handle range has been acquired.
// The operation flow of the each task of data is as follows:
// 1. Open a goroutine. Traverse the snapshot to obtain the handle range, while accessing the corresponding row key and
// raw index value. Then notify to start the next task.
// 2. Decode this task of raw index value to get the corresponding index value.
// 3. Deal with these index records one by one. If the index record exists, skip to the next row.
// If the index doesn't exist, create the index and then continue to handle the next row.
// 4. When the handle of a range is completed, return the corresponding task result.
// The above operations are completed in a transaction.
// When concurrent tasks are processed, the task result returned by each task is sorted by the handle. Then traverse the
// task results, get the total number of rows in the concurrent task and update the processed handle value. If
// an error message is displayed, exit the traversal.
// Finally, update the concurrent processing of the total number of rows, and store the completed handle value.
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo, job *model.Job) error {
cols := t.Cols()
colMap := make(map[int64]*types.FieldType)
@ -375,48 +442,183 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
col := cols[v.Offset]
colMap[col.ID] = &col.FieldType
}
batchCnt := defaultSmallBatchCnt
batchOpInfo := &indexBatchOpInfo{
tblIndex: tables.NewIndex(t.Meta(), indexInfo),
addedCount: job.GetRowCount(),
colMap: colMap,
handle: reorgInfo.Handle,
idxRecords: make([]*indexRecord, 0, batchCnt),
taskCnt := defaultTaskCnt
taskOpInfo := &indexTaskOpInfo{
tblIndex: tables.NewIndex(t.Meta(), indexInfo),
colMap: colMap,
nextCh: make(chan int64, 1),
taskRetCh: make(chan *taskResult, taskCnt),
}
seekHandle := reorgInfo.Handle
addedCount := job.GetRowCount()
taskStartHandle := reorgInfo.Handle
for {
startTime := time.Now()
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
err1 := d.isReorgRunnable(txn, ddlJobFlag)
if err1 != nil {
return errors.Trace(err1)
wg := sync.WaitGroup{}
for i := 0; i < taskCnt; i++ {
wg.Add(1)
go d.doBackfillIndexTask(t, taskOpInfo, taskStartHandle, &wg)
doneHandle := <-taskOpInfo.nextCh
// There is no data to seek.
if doneHandle == taskStartHandle {
break
}
batchOpInfo.idxRecords = batchOpInfo.idxRecords[:0]
err1 = d.backfillIndexInTxn(t, txn, batchOpInfo, seekHandle)
taskStartHandle = doneHandle + 1
}
wg.Wait()
retCnt := len(taskOpInfo.taskRetCh)
taskAddedCount, doneHandle, err := getCountAndHandle(taskOpInfo)
// Update the reorg handle that has been processed.
if taskAddedCount != 0 {
err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
return errors.Trace(reorgInfo.UpdateHandle(txn, doneHandle+1))
})
if err1 != nil {
return errors.Trace(err1)
if err == nil {
err = err1
} else {
log.Warnf("[ddl] add index failed when update handle %d, err %v", doneHandle, err)
}
}
// Update the reorg handle that has been processed.
return errors.Trace(reorgInfo.UpdateHandle(txn, batchOpInfo.handle))
})
}
addedCount += int64(taskAddedCount)
sub := time.Since(startTime).Seconds()
if err != nil {
log.Warnf("[ddl] added index for %v rows failed, take time %v", batchOpInfo.addedCount, sub)
log.Warnf("[ddl] total added index for %d rows, this task add index for %d failed, take time %v",
addedCount, taskAddedCount, sub)
return errors.Trace(err)
}
job.SetRowCount(batchOpInfo.addedCount)
job.SetRowCount(addedCount)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
log.Infof("[ddl] added index for %v rows, take time %v", batchOpInfo.addedCount, sub)
log.Infof("[ddl] total added index for %d rows, this task added index for %d rows, take time %v",
addedCount, taskAddedCount, sub)
if len(batchOpInfo.idxRecords) < batchCnt {
if retCnt < taskCnt {
return nil
}
seekHandle = batchOpInfo.handle + 1
}
}
// handleInfo records start and end handle that is used in a task.
type handleInfo struct {
startHandle int64
endHandle int64
isSent bool // It ensures that the endHandle is assigned only once and is sent once.
}
func (h *handleInfo) isFinished(input int64) bool {
if !h.isSent || input < h.endHandle {
return false
}
return true
}
func getCountAndHandle(taskOpInfo *indexTaskOpInfo) (int64, int64, error) {
l := len(taskOpInfo.taskRetCh)
taskRets := make([]*taskResult, 0, l)
for i := 0; i < l; i++ {
taskRet := <-taskOpInfo.taskRetCh
taskRets = append(taskRets, taskRet)
}
sort.Sort(taskRetSlice(taskRets))
taskAddedCount, currHandle := int64(0), int64(0)
var err error
for _, ret := range taskRets {
if ret.err != nil {
err = ret.err
break
}
taskAddedCount += int64(ret.count)
currHandle = ret.doneHandle
}
return taskAddedCount, currHandle, errors.Trace(err)
}
func (d *ddl) doBackfillIndexTask(t table.Table, taskOpInfo *indexTaskOpInfo, startHandle int64, wg *sync.WaitGroup) {
defer wg.Done()
ret := new(taskResult)
handleInfo := &handleInfo{startHandle: startHandle}
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
err1 := d.isReorgRunnable(txn, ddlJobFlag)
if err1 != nil {
return errors.Trace(err1)
}
ret = d.doBackfillIndexTaskInTxn(t, txn, taskOpInfo, handleInfo)
if ret.err != nil {
return errors.Trace(ret.err)
}
return nil
})
if err != nil {
ret.err = errors.Trace(err)
}
// It's failed to fetch row keys.
if !handleInfo.isSent {
taskOpInfo.nextCh <- startHandle
}
taskOpInfo.taskRetCh <- ret
}
// doBackfillIndexTaskInTxn deals with a part of backfilling index data in a Transaction.
// This part of the index data rows is defaultTaskHandleCnt.
func (d *ddl) doBackfillIndexTaskInTxn(t table.Table, txn kv.Transaction, taskOpInfo *indexTaskOpInfo,
handleInfo *handleInfo) *taskResult {
idxRecords, taskRet := d.fetchRowColVals(txn, t, taskOpInfo, handleInfo)
if taskRet.err != nil {
taskRet.err = errors.Trace(taskRet.err)
return taskRet
}
for _, idxRecord := range idxRecords {
log.Debug("[ddl] backfill index...", idxRecord.handle)
err := txn.LockKeys(idxRecord.key)
if err != nil {
taskRet.err = errors.Trace(err)
return taskRet
}
// Create the index.
handle, err := taskOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle)
if err != nil {
if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle {
// Index already exists, skip it.
continue
}
taskRet.err = errors.Trace(err)
return taskRet
}
}
return taskRet
}
func (d *ddl) dropTableIndex(indexInfo *model.IndexInfo, job *model.Job) error {
startKey := tablecodec.EncodeTableIndexPrefix(job.TableID, indexInfo.ID)
// It's asynchronous so it doesn't need to consider if it completes.
deleteAll := -1
_, _, err := d.delKeysWithStartKey(startKey, startKey, ddlJobFlag, job, deleteAll)
return errors.Trace(err)
}
func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo {
for _, idx := range indices {
if idx.Name.L == idxName {
return idx
}
}
return nil
}
func allocateIndexID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxIndexID++
return tblInfo.MaxIndexID
}
// recordIterFunc is used for low-level record iteration.
type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error)
@ -461,67 +663,3 @@ func (d *ddl) iterateSnapshotRows(t table.Table, version uint64, seekHandle int6
return nil
}
type indexBatchOpInfo struct {
tblIndex table.Index
addedCount int64
handle int64 // This is the last reorg handle that has been processed.
colMap map[int64]*types.FieldType
idxRecords []*indexRecord
}
type indexRecord struct {
handle int64
key []byte
vals []types.Datum
}
// backfillIndexInTxn deals with a part of backfilling index data in a Transaction.
// This part of the index data rows is defaultSmallBatchCnt.
func (d *ddl) backfillIndexInTxn(t table.Table, txn kv.Transaction, batchOpInfo *indexBatchOpInfo, seekHandle int64) error {
err := d.fetchRowColVals(txn, t, batchOpInfo, seekHandle)
if err != nil {
return errors.Trace(err)
}
for _, idxRecord := range batchOpInfo.idxRecords {
log.Debug("[ddl] backfill index...", idxRecord.handle)
err = txn.LockKeys(idxRecord.key)
if err != nil {
return errors.Trace(err)
}
// Create the index.
handle, err := batchOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle)
if err != nil {
if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle {
// Index already exists, skip it.
continue
}
return errors.Trace(err)
}
}
return nil
}
func (d *ddl) dropTableIndex(indexInfo *model.IndexInfo, job *model.Job) error {
startKey := tablecodec.EncodeTableIndexPrefix(job.TableID, indexInfo.ID)
// It's asynchronous so it doesn't need to consider if it completes.
deleteAll := -1
_, _, err := d.delKeysWithStartKey(startKey, startKey, ddlJobFlag, job, deleteAll)
return errors.Trace(err)
}
func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo {
for _, idx := range indices {
if idx.Name.L == idxName {
return idx
}
}
return nil
}
func allocateIndexID(tblInfo *model.TableInfo) int64 {
tblInfo.MaxIndexID++
return tblInfo.MaxIndexID
}