ddl: batch backfill column datas.
This commit is contained in:
@ -14,8 +14,6 @@
|
||||
package ddl
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
|
||||
"github.com/juju/errors"
|
||||
"github.com/ngaut/log"
|
||||
"github.com/pingcap/tidb/column"
|
||||
@ -24,7 +22,6 @@ import (
|
||||
"github.com/pingcap/tidb/model"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/table/tables"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/errors2"
|
||||
)
|
||||
|
||||
@ -201,48 +198,32 @@ func (d *ddl) onColumnDrop(t *meta.Meta, job *model.Job) error {
|
||||
}
|
||||
|
||||
func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, version uint64) error {
|
||||
ver := kv.Version{Ver: version}
|
||||
|
||||
snap, err := d.store.GetSnapshot(ver)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
defer snap.MvccRelease()
|
||||
|
||||
firstKey := t.FirstKey()
|
||||
prefix := []byte(t.KeyPrefix())
|
||||
|
||||
ctx := d.newReorgContext()
|
||||
txn, err := ctx.GetTxn(true)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer txn.Rollback()
|
||||
|
||||
it := snap.NewMvccIterator(kv.EncodeKey([]byte(firstKey)), ver)
|
||||
defer it.Close()
|
||||
|
||||
for it.Valid() {
|
||||
key := kv.DecodeKey([]byte(it.Key()))
|
||||
if !bytes.HasPrefix(key, prefix) {
|
||||
break
|
||||
seekHandle := int64(0)
|
||||
for {
|
||||
handles, err := d.getSnapshotRows(t, version, seekHandle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if len(handles) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var handle int64
|
||||
handle, err = util.DecodeHandleFromRowKey(string(key))
|
||||
seekHandle = handles[len(handles)-1] + 1
|
||||
// TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle.
|
||||
|
||||
err = d.backfillColumnData(t, columnInfo, handles)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, handles []int64) error {
|
||||
for _, handle := range handles {
|
||||
log.Info("backfill column...", handle)
|
||||
|
||||
// The first key in one row is the lock.
|
||||
lock := t.RecordKey(handle, nil)
|
||||
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
// First check if row exists.
|
||||
var exist bool
|
||||
exist, err = checkRowExist(txn, t, handle)
|
||||
exist, err := checkRowExist(txn, t, handle)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if !exist {
|
||||
@ -260,7 +241,7 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, versio
|
||||
|
||||
// If row column doesn't exist, we need to backfill column.
|
||||
// Lock row first.
|
||||
err = txn.LockKeys(lock)
|
||||
err = txn.LockKeys(t.RecordKey(handle, nil))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -275,14 +256,10 @@ func (d *ddl) backfillColumn(t table.Table, columnInfo *model.ColumnInfo, versio
|
||||
return nil
|
||||
})
|
||||
|
||||
rk := kv.EncodeKey(lock)
|
||||
err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
|
||||
if errors2.ErrorEqual(err, kv.ErrNotExist) {
|
||||
break
|
||||
} else if err != nil {
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
return errors.Trace(txn.Commit())
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user