diff --git a/ddl/index.go b/ddl/index.go index f5a0ca7bca..04f7881d75 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -14,6 +14,8 @@ package ddl import ( + "sort" + "sync" "time" "github.com/juju/errors" @@ -324,50 +326,115 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error { return errors.Trace(err) } -func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, batchOpInfo *indexBatchOpInfo, seekHandle int64) error { - cols := t.Cols() - idxInfo := batchOpInfo.tblIndex.Meta() - - err := d.iterateSnapshotRows(t, txn.StartTS(), seekHandle, +func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, taskOpInfo *indexTaskOpInfo, handleInfo *handleInfo) ( + []*indexRecord, *taskResult) { + handleCnt := defaultTaskHandleCnt + rawRecords := make([][]byte, 0, handleCnt) + idxRecords := make([]*indexRecord, 0, handleCnt) + ret := &taskResult{doneHandle: handleInfo.startHandle} + err := d.iterateSnapshotRows(t, txn.StartTS(), handleInfo.startHandle, func(h int64, rowKey kv.Key, rawRecord []byte) (bool, error) { - rowMap, err := tablecodec.DecodeRow(rawRecord, batchOpInfo.colMap) - if err != nil { - return false, errors.Trace(err) - } - idxVal := make([]types.Datum, 0, len(idxInfo.Columns)) - for _, v := range idxInfo.Columns { - col := cols[v.Offset] - idxVal = append(idxVal, rowMap[col.ID]) - } - - indexRecord := &indexRecord{handle: h, key: rowKey, vals: idxVal} - batchOpInfo.idxRecords = append(batchOpInfo.idxRecords, indexRecord) - if len(batchOpInfo.idxRecords) == defaultSmallBatchCnt { + rawRecords = append(rawRecords, rawRecord) + indexRecord := &indexRecord{handle: h, key: rowKey} + idxRecords = append(idxRecords, indexRecord) + if len(idxRecords) == handleCnt || handleInfo.isFinished(h) { return false, nil } return true, nil }) if err != nil { - return errors.Trace(err) - } else if len(batchOpInfo.idxRecords) == 0 { - return nil + ret.err = errors.Trace(err) + return nil, ret } - count := len(batchOpInfo.idxRecords) - batchOpInfo.addedCount += int64(count) - batchOpInfo.handle = batchOpInfo.idxRecords[count-1].handle - return nil + ret.count = len(idxRecords) + if ret.count > 0 { + ret.doneHandle = idxRecords[ret.count-1].handle + } + // Be sure to do this operation only once. + if !handleInfo.isSent { + // Notice to start the next task operation. + taskOpInfo.nextCh <- ret.doneHandle + // Record the last handle. + // Ensure that the handle scope of the task doesn't change, + // even if the transaction retries it can't effect the other tasks. + handleInfo.endHandle = ret.doneHandle + handleInfo.isSent = true + } + if ret.count == 0 { + return nil, ret + } + + cols := t.Cols() + idxInfo := taskOpInfo.tblIndex.Meta() + for i, idxRecord := range idxRecords { + rowMap, err := tablecodec.DecodeRow(rawRecords[i], taskOpInfo.colMap) + if err != nil { + ret.err = errors.Trace(err) + return nil, ret + } + idxVal := make([]types.Datum, 0, len(idxInfo.Columns)) + for _, v := range idxInfo.Columns { + col := cols[v.Offset] + idxVal = append(idxVal, rowMap[col.ID]) + } + idxRecord.vals = idxVal + } + return idxRecords, ret } -const defaultBatchCnt = 1024 -const defaultSmallBatchCnt = 128 +const ( + defaultBatchCnt = 1024 + defaultSmallBatchCnt = 128 + defaultTaskHandleCnt = 128 + defaultTaskCnt = 16 +) + +// taskResult is the result of the task. +type taskResult struct { + count int // The number of records that has been processed in the task. + doneHandle int64 // This is the last reorg handle that has been processed. + err error +} + +type taskRetSlice []*taskResult + +func (b taskRetSlice) Len() int { return len(b) } +func (b taskRetSlice) Less(i, j int) bool { return b[i].doneHandle < b[j].doneHandle } +func (b taskRetSlice) Swap(i, j int) { b[i], b[j] = b[j], b[i] } + +// indexRecord is the record information of an index. +type indexRecord struct { + handle int64 + key []byte // It's used to lock a record. Record it to reduce the encoding time. + vals []types.Datum // It's the index values. +} + +// indexTaskOpInfo records the information that is needed in the task. +type indexTaskOpInfo struct { + tblIndex table.Index + colMap map[int64]*types.FieldType // It's the index columns map. + taskRetCh chan *taskResult // Get the results of all tasks. + nextCh chan int64 // It notifies to start the next task. +} // How to add index in reorganization state? -// 1. Generate a snapshot with special version. -// 2. Traverse the snapshot, get every row in the table. -// 3. For one row, if the row has been already deleted, skip to next row. -// 4. If not deleted, check whether index has existed, if existed, skip to next row. -// 5. If index doesn't exist, create the index and then continue to handle next row. +// Concurrently process the defaultTaskHandleCnt tasks. Each task deals with a handle range of the index record. +// The handle range size is defaultTaskHandleCnt. +// Because each handle range depends on the previous one, it's necessary to obtain the handle range serially. +// Real concurrent processing needs to perform after the handle range has been acquired. +// The operation flow of the each task of data is as follows: +// 1. Open a goroutine. Traverse the snapshot to obtain the handle range, while accessing the corresponding row key and +// raw index value. Then notify to start the next task. +// 2. Decode this task of raw index value to get the corresponding index value. +// 3. Deal with these index records one by one. If the index record exists, skip to the next row. +// If the index doesn't exist, create the index and then continue to handle the next row. +// 4. When the handle of a range is completed, return the corresponding task result. +// The above operations are completed in a transaction. +// When concurrent tasks are processed, the task result returned by each task is sorted by the handle. Then traverse the +// task results, get the total number of rows in the concurrent task and update the processed handle value. If +// an error message is displayed, exit the traversal. +// Finally, update the concurrent processing of the total number of rows, and store the completed handle value. func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo *reorgInfo, job *model.Job) error { cols := t.Cols() colMap := make(map[int64]*types.FieldType) @@ -375,48 +442,183 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo col := cols[v.Offset] colMap[col.ID] = &col.FieldType } - batchCnt := defaultSmallBatchCnt - batchOpInfo := &indexBatchOpInfo{ - tblIndex: tables.NewIndex(t.Meta(), indexInfo), - addedCount: job.GetRowCount(), - colMap: colMap, - handle: reorgInfo.Handle, - idxRecords: make([]*indexRecord, 0, batchCnt), + taskCnt := defaultTaskCnt + taskOpInfo := &indexTaskOpInfo{ + tblIndex: tables.NewIndex(t.Meta(), indexInfo), + colMap: colMap, + nextCh: make(chan int64, 1), + taskRetCh: make(chan *taskResult, taskCnt), } - seekHandle := reorgInfo.Handle + addedCount := job.GetRowCount() + taskStartHandle := reorgInfo.Handle for { startTime := time.Now() - err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { - err1 := d.isReorgRunnable(txn, ddlJobFlag) - if err1 != nil { - return errors.Trace(err1) + wg := sync.WaitGroup{} + for i := 0; i < taskCnt; i++ { + wg.Add(1) + go d.doBackfillIndexTask(t, taskOpInfo, taskStartHandle, &wg) + doneHandle := <-taskOpInfo.nextCh + // There is no data to seek. + if doneHandle == taskStartHandle { + break } - batchOpInfo.idxRecords = batchOpInfo.idxRecords[:0] - err1 = d.backfillIndexInTxn(t, txn, batchOpInfo, seekHandle) + taskStartHandle = doneHandle + 1 + } + wg.Wait() + + retCnt := len(taskOpInfo.taskRetCh) + taskAddedCount, doneHandle, err := getCountAndHandle(taskOpInfo) + // Update the reorg handle that has been processed. + if taskAddedCount != 0 { + err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + return errors.Trace(reorgInfo.UpdateHandle(txn, doneHandle+1)) + }) if err1 != nil { - return errors.Trace(err1) + if err == nil { + err = err1 + } else { + log.Warnf("[ddl] add index failed when update handle %d, err %v", doneHandle, err) + } } - // Update the reorg handle that has been processed. - return errors.Trace(reorgInfo.UpdateHandle(txn, batchOpInfo.handle)) - }) + } + + addedCount += int64(taskAddedCount) sub := time.Since(startTime).Seconds() if err != nil { - log.Warnf("[ddl] added index for %v rows failed, take time %v", batchOpInfo.addedCount, sub) + log.Warnf("[ddl] total added index for %d rows, this task add index for %d failed, take time %v", + addedCount, taskAddedCount, sub) return errors.Trace(err) } - - job.SetRowCount(batchOpInfo.addedCount) + job.SetRowCount(addedCount) batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub) - log.Infof("[ddl] added index for %v rows, take time %v", batchOpInfo.addedCount, sub) + log.Infof("[ddl] total added index for %d rows, this task added index for %d rows, take time %v", + addedCount, taskAddedCount, sub) - if len(batchOpInfo.idxRecords) < batchCnt { + if retCnt < taskCnt { return nil } - seekHandle = batchOpInfo.handle + 1 } } +// handleInfo records start and end handle that is used in a task. +type handleInfo struct { + startHandle int64 + endHandle int64 + isSent bool // It ensures that the endHandle is assigned only once and is sent once. +} + +func (h *handleInfo) isFinished(input int64) bool { + if !h.isSent || input < h.endHandle { + return false + } + return true +} + +func getCountAndHandle(taskOpInfo *indexTaskOpInfo) (int64, int64, error) { + l := len(taskOpInfo.taskRetCh) + taskRets := make([]*taskResult, 0, l) + for i := 0; i < l; i++ { + taskRet := <-taskOpInfo.taskRetCh + taskRets = append(taskRets, taskRet) + } + sort.Sort(taskRetSlice(taskRets)) + + taskAddedCount, currHandle := int64(0), int64(0) + var err error + for _, ret := range taskRets { + if ret.err != nil { + err = ret.err + break + } + taskAddedCount += int64(ret.count) + currHandle = ret.doneHandle + } + return taskAddedCount, currHandle, errors.Trace(err) +} + +func (d *ddl) doBackfillIndexTask(t table.Table, taskOpInfo *indexTaskOpInfo, startHandle int64, wg *sync.WaitGroup) { + defer wg.Done() + + ret := new(taskResult) + handleInfo := &handleInfo{startHandle: startHandle} + err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { + err1 := d.isReorgRunnable(txn, ddlJobFlag) + if err1 != nil { + return errors.Trace(err1) + } + ret = d.doBackfillIndexTaskInTxn(t, txn, taskOpInfo, handleInfo) + if ret.err != nil { + return errors.Trace(ret.err) + } + return nil + }) + if err != nil { + ret.err = errors.Trace(err) + } + + // It's failed to fetch row keys. + if !handleInfo.isSent { + taskOpInfo.nextCh <- startHandle + } + + taskOpInfo.taskRetCh <- ret +} + +// doBackfillIndexTaskInTxn deals with a part of backfilling index data in a Transaction. +// This part of the index data rows is defaultTaskHandleCnt. +func (d *ddl) doBackfillIndexTaskInTxn(t table.Table, txn kv.Transaction, taskOpInfo *indexTaskOpInfo, + handleInfo *handleInfo) *taskResult { + idxRecords, taskRet := d.fetchRowColVals(txn, t, taskOpInfo, handleInfo) + if taskRet.err != nil { + taskRet.err = errors.Trace(taskRet.err) + return taskRet + } + + for _, idxRecord := range idxRecords { + log.Debug("[ddl] backfill index...", idxRecord.handle) + err := txn.LockKeys(idxRecord.key) + if err != nil { + taskRet.err = errors.Trace(err) + return taskRet + } + + // Create the index. + handle, err := taskOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle) + if err != nil { + if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle { + // Index already exists, skip it. + continue + } + taskRet.err = errors.Trace(err) + return taskRet + } + } + return taskRet +} + +func (d *ddl) dropTableIndex(indexInfo *model.IndexInfo, job *model.Job) error { + startKey := tablecodec.EncodeTableIndexPrefix(job.TableID, indexInfo.ID) + // It's asynchronous so it doesn't need to consider if it completes. + deleteAll := -1 + _, _, err := d.delKeysWithStartKey(startKey, startKey, ddlJobFlag, job, deleteAll) + return errors.Trace(err) +} + +func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo { + for _, idx := range indices { + if idx.Name.L == idxName { + return idx + } + } + return nil +} + +func allocateIndexID(tblInfo *model.TableInfo) int64 { + tblInfo.MaxIndexID++ + return tblInfo.MaxIndexID +} + // recordIterFunc is used for low-level record iteration. type recordIterFunc func(h int64, rowKey kv.Key, rawRecord []byte) (more bool, err error) @@ -461,67 +663,3 @@ func (d *ddl) iterateSnapshotRows(t table.Table, version uint64, seekHandle int6 return nil } - -type indexBatchOpInfo struct { - tblIndex table.Index - addedCount int64 - handle int64 // This is the last reorg handle that has been processed. - colMap map[int64]*types.FieldType - idxRecords []*indexRecord -} - -type indexRecord struct { - handle int64 - key []byte - vals []types.Datum -} - -// backfillIndexInTxn deals with a part of backfilling index data in a Transaction. -// This part of the index data rows is defaultSmallBatchCnt. -func (d *ddl) backfillIndexInTxn(t table.Table, txn kv.Transaction, batchOpInfo *indexBatchOpInfo, seekHandle int64) error { - err := d.fetchRowColVals(txn, t, batchOpInfo, seekHandle) - if err != nil { - return errors.Trace(err) - } - - for _, idxRecord := range batchOpInfo.idxRecords { - log.Debug("[ddl] backfill index...", idxRecord.handle) - err = txn.LockKeys(idxRecord.key) - if err != nil { - return errors.Trace(err) - } - - // Create the index. - handle, err := batchOpInfo.tblIndex.Create(txn, idxRecord.vals, idxRecord.handle) - if err != nil { - if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle { - // Index already exists, skip it. - continue - } - return errors.Trace(err) - } - } - return nil -} - -func (d *ddl) dropTableIndex(indexInfo *model.IndexInfo, job *model.Job) error { - startKey := tablecodec.EncodeTableIndexPrefix(job.TableID, indexInfo.ID) - // It's asynchronous so it doesn't need to consider if it completes. - deleteAll := -1 - _, _, err := d.delKeysWithStartKey(startKey, startKey, ddlJobFlag, job, deleteAll) - return errors.Trace(err) -} - -func findIndexByName(idxName string, indices []*model.IndexInfo) *model.IndexInfo { - for _, idx := range indices { - if idx.Name.L == idxName { - return idx - } - } - return nil -} - -func allocateIndexID(tblInfo *model.TableInfo) int64 { - tblInfo.MaxIndexID++ - return tblInfo.MaxIndexID -}