ddl: just test if owner before to run reorganization.
This commit is contained in:
@ -335,6 +335,10 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha
|
||||
log.Info("backfill column...", handle)
|
||||
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// First check if row exists.
|
||||
exist, err := checkRowExist(txn, t, handle)
|
||||
if err != nil {
|
||||
@ -401,6 +405,10 @@ func (d *ddl) dropTableColumn(t table.Table, colInfo *model.ColumnInfo, reorgInf
|
||||
seekHandle = handles[len(handles)-1] + 1
|
||||
|
||||
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
var h int64
|
||||
for _, h = range handles {
|
||||
key := t.RecordKey(h, col)
|
||||
|
||||
@ -421,6 +421,10 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
|
||||
log.Debug("building index...", handle)
|
||||
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// first check row exists
|
||||
exist, err := checkRowExist(txn, t, handle)
|
||||
if err != nil {
|
||||
|
||||
18
ddl/reorg.go
18
ddl/reorg.go
@ -124,12 +124,30 @@ func (d *ddl) runReorgJob(f func() error) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) isOwnerInReorg(txn kv.Transaction) error {
|
||||
t := meta.NewMeta(txn)
|
||||
owner, err := t.GetDDLOwner()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if owner == nil || owner.OwnerID != d.uuid {
|
||||
// if no owner, we will try later, so here just return error.
|
||||
// or another server is owner, return error too.
|
||||
return errors.Trace(ErrNotOwner)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ddl) delKeysWithPrefix(prefix string) error {
|
||||
keys := make([]string, maxBatchSize)
|
||||
|
||||
for {
|
||||
keys := keys[0:0]
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
if err := d.isOwnerInReorg(txn); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
iter, err := txn.Seek([]byte(prefix))
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
|
||||
Reference in New Issue
Block a user