ddl: use ErrNotOwner
This commit is contained in:
@ -95,10 +95,10 @@ func asyncNotify(ch chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ddl) verifyOwner(t *meta.TMeta) (bool, error) {
|
||||
func (d *ddl) verifyOwner(t *meta.TMeta) error {
|
||||
owner, err := t.GetDDLOwner()
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
if owner == nil {
|
||||
@ -117,14 +117,20 @@ func (d *ddl) verifyOwner(t *meta.TMeta) (bool, error) {
|
||||
err = t.SetDDLOwner(owner)
|
||||
}
|
||||
|
||||
return owner.OwnerID == d.uuid, errors.Trace(err)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if owner.OwnerID != d.uuid {
|
||||
return errors.Trace(ErrNotOwner)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// every time we enter another state, we must call this function.
|
||||
func (d *ddl) updateJob(t *meta.TMeta, j *model.Job) error {
|
||||
isOwner, err := d.verifyOwner(t)
|
||||
if !isOwner {
|
||||
return errors.Errorf("not owner, can't update job")
|
||||
err := d.verifyOwner(t)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
err = t.UpdateDDLJob(0, j)
|
||||
@ -154,9 +160,9 @@ func (d *ddl) getFirstJob() (*model.Job, error) {
|
||||
func (d *ddl) finishJob(job *model.Job) error {
|
||||
// done, notice and run next job.
|
||||
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
|
||||
isOwner, err := d.verifyOwner(t)
|
||||
if !isOwner {
|
||||
return errors.Errorf("not owner, can't finish job")
|
||||
err := d.verifyOwner(t)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
_, err = t.DeQueueDDLJob()
|
||||
@ -172,26 +178,24 @@ func (d *ddl) finishJob(job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (d *ddl) checkOwner() (bool, error) {
|
||||
var isOwner bool
|
||||
func (d *ddl) checkOwner() error {
|
||||
err := d.meta.RunInNewTxn(false, func(t *meta.TMeta) error {
|
||||
var err1 error
|
||||
isOwner, err1 = d.verifyOwner(t)
|
||||
return errors.Trace(err1)
|
||||
return errors.Trace(d.verifyOwner(t))
|
||||
})
|
||||
|
||||
return isOwner, errors.Trace(err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// ErrNotOwner means we are not owner and can't handle DDL jobs.
|
||||
var ErrNotOwner = errors.New("DDL: not owner")
|
||||
|
||||
func (d *ddl) handleJobQueue() error {
|
||||
for {
|
||||
isOwner, err := d.checkOwner()
|
||||
|
||||
if err != nil || !isOwner {
|
||||
// error or not owner
|
||||
err := d.checkOwner()
|
||||
if err != nil {
|
||||
// error
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// become the owner
|
||||
// get the first job and run
|
||||
var job *model.Job
|
||||
|
||||
Reference in New Issue
Block a user