*: update log and comments
This commit is contained in:
@ -344,5 +344,5 @@ func statement(ctx context.Context, sql string) stmt.Statement {
|
||||
}
|
||||
|
||||
func init() {
|
||||
log.SetLevelByString("warn")
|
||||
log.SetLevelByString("error")
|
||||
}
|
||||
|
||||
@ -25,50 +25,48 @@ import (
|
||||
)
|
||||
|
||||
func (d *ddl) handleTaskQueue() error {
|
||||
for {
|
||||
if d.isClosed() {
|
||||
if d.isClosed() {
|
||||
return nil
|
||||
}
|
||||
|
||||
task := &model.Job{}
|
||||
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
t := meta.NewMeta(txn)
|
||||
owner, err := d.checkOwner(t, ddlTaskFlag)
|
||||
if terror.ErrorEqual(err, ErrNotOwner) {
|
||||
return nil
|
||||
}
|
||||
|
||||
task := &model.Job{}
|
||||
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
t := meta.NewMeta(txn)
|
||||
owner, err := d.checkOwner(t, ddlTaskFlag)
|
||||
if terror.ErrorEqual(err, ErrNotOwner) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// get the first task and run
|
||||
task, err = d.getFirstTask(t)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if task == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.runTask(t, task)
|
||||
if task.IsFinished() {
|
||||
err = d.finishTask(t, task)
|
||||
} else {
|
||||
err = d.updateTask(t, task)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
owner.LastUpdateTS = time.Now().UnixNano()
|
||||
err = t.SetDDLTaskOwner(owner)
|
||||
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// get the first task and run
|
||||
task, err = d.getFirstTask(t)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if task == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.runTask(t, task)
|
||||
if task.IsFinished() {
|
||||
err = d.finishTask(t, task)
|
||||
} else {
|
||||
err = d.updateTask(t, task)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
owner.LastUpdateTS = time.Now().UnixNano()
|
||||
err = t.SetDDLTaskOwner(owner)
|
||||
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -108,8 +106,9 @@ func (d *ddl) prepareTask(job *model.Job) error {
|
||||
SchemaID: job.SchemaID,
|
||||
TableID: job.TableID,
|
||||
Type: job.Type,
|
||||
// TODO:
|
||||
Args: job.Args,
|
||||
}
|
||||
copy(task.Args, job.Args)
|
||||
|
||||
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
|
||||
t := meta.NewMeta(txn)
|
||||
@ -123,8 +122,8 @@ func (d *ddl) prepareTask(job *model.Job) error {
|
||||
|
||||
func (d *ddl) startTask(tp model.ActionType) {
|
||||
switch tp {
|
||||
case model.ActionDropSchema, model.ActionDropTable,
|
||||
model.ActionDropColumn, model.ActionDropIndex:
|
||||
case model.ActionDropSchema, model.ActionDropTable:
|
||||
// model.ActionDropColumn, model.ActionDropIndex:
|
||||
asyncNotify(d.taskCh)
|
||||
}
|
||||
}
|
||||
|
||||
10
ddl/table.go
10
ddl/table.go
@ -78,11 +78,16 @@ func (d *ddl) onCreateTable(t *meta.Meta, job *model.Job) error {
|
||||
|
||||
func (d *ddl) delReorgTable(t *meta.Meta, task *model.Job) error {
|
||||
// reorganization -> absent
|
||||
if len(task.Args) != 1 || task.Args[0] == nil {
|
||||
tblInfo := &model.TableInfo{}
|
||||
err := task.DecodeArgs(tblInfo)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if tblInfo == nil {
|
||||
task.State = model.JobCancelled
|
||||
return errors.Trace(infoschema.TableNotExists)
|
||||
}
|
||||
tbl, err := d.getTable(task.SchemaID, task.Args[0].(*model.TableInfo))
|
||||
tbl, err := d.getTable(task.SchemaID, tblInfo)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -141,6 +146,7 @@ func (d *ddl) onDropTable(t *meta.Meta, job *model.Job) error {
|
||||
break
|
||||
}
|
||||
// finish this job
|
||||
job.Args = []interface{}{tblInfo}
|
||||
job.State = model.JobDone
|
||||
job.SchemaState = model.StateNone
|
||||
default:
|
||||
|
||||
@ -179,8 +179,8 @@ func (d *ddl) finishJob(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
switch job.Type {
|
||||
case model.ActionDropSchema, model.ActionDropTable,
|
||||
model.ActionDropColumn, model.ActionDropIndex:
|
||||
case model.ActionDropSchema, model.ActionDropTable:
|
||||
// model.ActionDropColumn, model.ActionDropIndex:
|
||||
if err = d.prepareTask(job); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -212,7 +212,6 @@ func (d *ddl) handleJobQueue() error {
|
||||
var job *model.Job
|
||||
err := kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
|
||||
t := meta.NewMeta(txn)
|
||||
//owner, err := d.checkOwner(t)
|
||||
owner, err := d.checkOwner(t, ddlJobFlag)
|
||||
if terror.ErrorEqual(err, ErrNotOwner) {
|
||||
// we are not owner, return and retry checking later.
|
||||
@ -266,11 +265,6 @@ func (d *ddl) handleJobQueue() error {
|
||||
|
||||
return errors.Trace(err)
|
||||
})
|
||||
|
||||
if job != nil && job.IsFinished() {
|
||||
d.startTask(job.Type)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if job == nil {
|
||||
@ -288,6 +282,7 @@ func (d *ddl) handleJobQueue() error {
|
||||
}
|
||||
|
||||
if job.IsFinished() {
|
||||
d.startTask(job.Type)
|
||||
asyncNotify(d.jobDoneCh)
|
||||
}
|
||||
}
|
||||
|
||||
@ -626,12 +626,12 @@ func (m *Meta) DeQueueDDLTask() (*model.Job, error) {
|
||||
return m.deQueueDDLJob(mDDLTaskListKey)
|
||||
}
|
||||
|
||||
// GetDDLOwner gets the current task owner for DDL.
|
||||
// GetDDLTaskOwner gets the current task owner for DDL.
|
||||
func (m *Meta) GetDDLTaskOwner() (*model.Owner, error) {
|
||||
return m.getDDLOwner(mDDLTaskOwnerKey)
|
||||
}
|
||||
|
||||
// SetDDLOwner sets the current task owner for DDL.
|
||||
// SetDDLTaskOwner sets the current task owner for DDL.
|
||||
func (m *Meta) SetDDLTaskOwner(o *model.Owner) error {
|
||||
return m.setDDLOwner(mDDLTaskOwnerKey, o)
|
||||
}
|
||||
|
||||
@ -56,7 +56,7 @@ func (txn *hbaseTxn) Get(k kv.Key) ([]byte, error) {
|
||||
}
|
||||
|
||||
func (txn *hbaseTxn) Set(k kv.Key, v []byte) error {
|
||||
log.Debugf("[kv] seek %q txn:%d", k, txn.tid)
|
||||
log.Debugf("[kv] set %q txn:%d", k, txn.tid)
|
||||
return txn.us.Set(k, v)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user