101
ddl/index.go
101
ddl/index.go
@ -325,30 +325,54 @@ func (d *ddl) onDropIndex(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo *model.IndexInfo) (
|
||||
kv.Key, []types.Datum, error) {
|
||||
// fetch datas
|
||||
func (d *ddl) fetchRowColVals(txn kv.Transaction, t table.Table, handles []int64, indexInfo *model.IndexInfo) (
|
||||
[]*indexRecord, error) {
|
||||
// Through handles access to get all row keys.
|
||||
handlesLen := len(handles)
|
||||
rowKeys := make([]kv.Key, 0, handlesLen)
|
||||
for _, h := range handles {
|
||||
rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), h)
|
||||
rowKeys = append(rowKeys, rowKey)
|
||||
}
|
||||
|
||||
// Get corresponding raw values for rowKeys.
|
||||
ver := kv.Version{Ver: txn.StartTS()}
|
||||
snap, err := d.store.GetSnapshot(ver)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
pairMap, err := snap.BatchGet(rowKeys)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// Get corresponding values for pairMap.
|
||||
cols := t.Cols()
|
||||
colMap := make(map[int64]*types.FieldType)
|
||||
for _, v := range indexInfo.Columns {
|
||||
col := cols[v.Offset]
|
||||
colMap[col.ID] = &col.FieldType
|
||||
}
|
||||
rowKey := tablecodec.EncodeRecordKey(t.RecordPrefix(), handle)
|
||||
rowVal, err := txn.Get(rowKey)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
idxRecords := make([]*indexRecord, 0, handlesLen)
|
||||
for i, rowKey := range rowKeys {
|
||||
rawVal, ok := pairMap[string(rowKey)]
|
||||
if !ok {
|
||||
// Row doesn't exist, skip it.
|
||||
continue
|
||||
}
|
||||
row, err := tablecodec.DecodeRow(rawVal, colMap)
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
rowVal := make([]types.Datum, 0, len(indexInfo.Columns))
|
||||
for _, v := range indexInfo.Columns {
|
||||
col := cols[v.Offset]
|
||||
rowVal = append(rowVal, row[col.ID])
|
||||
}
|
||||
idxRecord := &indexRecord{handle: handles[i], key: rowKey, vals: rowVal}
|
||||
idxRecords = append(idxRecords, idxRecord)
|
||||
}
|
||||
row, err := tablecodec.DecodeRow(rowVal, colMap)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
vals := make([]types.Datum, 0, len(indexInfo.Columns))
|
||||
for _, v := range indexInfo.Columns {
|
||||
col := cols[v.Offset]
|
||||
vals = append(vals, row[col.ID])
|
||||
}
|
||||
return rowKey, vals, nil
|
||||
return idxRecords, nil
|
||||
}
|
||||
|
||||
const defaultBatchCnt = 1024
|
||||
@ -432,43 +456,38 @@ func (d *ddl) getSnapshotRows(t table.Table, version uint64, seekHandle int64) (
|
||||
return handles, nil
|
||||
}
|
||||
|
||||
type indexRecord struct {
|
||||
handle int64
|
||||
key []byte
|
||||
vals []types.Datum
|
||||
}
|
||||
|
||||
// backfillIndexInTxn deals with a part of backfilling index data in a Transaction.
|
||||
// This part of the index data rows is defaultSmallBatchCnt.
|
||||
func (d *ddl) backfillIndexInTxn(t table.Table, kvIdx table.Index, handles []int64, txn kv.Transaction) (int64, error) {
|
||||
nextHandle := handles[0]
|
||||
for _, handle := range handles {
|
||||
log.Debug("[ddl] backfill index...", handle)
|
||||
rowKey, vals, err := fetchRowColVals(txn, t, handle, kvIdx.Meta())
|
||||
if terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
// Row doesn't exist, skip it.
|
||||
nextHandle = handle
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
idxRecords, err := d.fetchRowColVals(txn, t, handles, kvIdx.Meta())
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
exist, _, err := kvIdx.Exist(txn, vals, handle)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
} else if exist {
|
||||
// Index already exists, skip it.
|
||||
nextHandle = handle
|
||||
continue
|
||||
}
|
||||
err = txn.LockKeys(rowKey)
|
||||
for _, idxRecord := range idxRecords {
|
||||
log.Debug("[ddl] backfill index...", idxRecord.handle)
|
||||
err = txn.LockKeys(idxRecord.key)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
|
||||
// Create the index.
|
||||
_, err = kvIdx.Create(txn, vals, handle)
|
||||
handle, err := kvIdx.Create(txn, idxRecord.vals, idxRecord.handle)
|
||||
if err != nil {
|
||||
if terror.ErrorEqual(err, kv.ErrKeyExists) && idxRecord.handle == handle {
|
||||
// Index already exists, skip it.
|
||||
continue
|
||||
}
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
nextHandle = handle
|
||||
}
|
||||
return nextHandle, nil
|
||||
return idxRecords[len(idxRecords)-1].handle, nil
|
||||
}
|
||||
|
||||
func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, handles []int64, reorgInfo *reorgInfo) error {
|
||||
|
||||
Reference in New Issue
Block a user