@ -69,9 +69,9 @@ type ddl struct {
|
||||
uuid string
|
||||
jobCh chan struct{}
|
||||
jobDoneCh chan struct{}
|
||||
// reOrgDoneCh is for re-orgnization, if the re-orgnization job is done,
|
||||
// reOrgDoneCh is for reorgnization, if the reorgnization job is done,
|
||||
// we will use this channel to notify outer.
|
||||
// TODO: now we use goroutine to simulate re-orgnization jobs, later we may
|
||||
// TODO: now we use goroutine to simulate reorgnization jobs, later we may
|
||||
// use a persistent job list.
|
||||
reOrgDoneCh chan error
|
||||
}
|
||||
|
||||
29
ddl/reorg.go
29
ddl/reorg.go
@ -24,13 +24,11 @@ import (
|
||||
|
||||
var _ context.Context = &reOrgContext{}
|
||||
|
||||
// reOrgContext implements context.Context interface for re-orgnization use.
|
||||
// reOrgContext implements context.Context interface for reorgnization use.
|
||||
type reOrgContext struct {
|
||||
store kv.Storage
|
||||
|
||||
m map[fmt.Stringer]interface{}
|
||||
|
||||
txn kv.Transaction
|
||||
m map[fmt.Stringer]interface{}
|
||||
txn kv.Transaction
|
||||
}
|
||||
|
||||
func (c *reOrgContext) GetTxn(forceNew bool) (kv.Transaction, error) {
|
||||
@ -85,7 +83,7 @@ func (c *reOrgContext) ClearValue(key fmt.Stringer) {
|
||||
delete(c.m, key)
|
||||
}
|
||||
|
||||
func (d *ddl) newReOrgContext() context.Context {
|
||||
func (d *ddl) newReorgContext() context.Context {
|
||||
c := &reOrgContext{
|
||||
store: d.store,
|
||||
m: make(map[fmt.Stringer]interface{}),
|
||||
@ -94,30 +92,29 @@ func (d *ddl) newReOrgContext() context.Context {
|
||||
return c
|
||||
}
|
||||
|
||||
const waitReOrgTimeout = 10 * time.Second
|
||||
const waitReorgTimeout = 10 * time.Second
|
||||
|
||||
var errWaitReOrgTimeout = errors.New("wait re-orgnization done timeout")
|
||||
var errWaitReorgTimeout = errors.New("wait for reorgnization timeout")
|
||||
|
||||
func (d *ddl) runReOrgJob(f func() error) error {
|
||||
// wait re-orgnization jobs done
|
||||
// TODO use persistent re-orgnization job list.
|
||||
func (d *ddl) runReorgJob(f func() error) error {
|
||||
// wait reorgnization jobs done
|
||||
// TODO use persistent reorgnization job list.
|
||||
if d.reOrgDoneCh == nil {
|
||||
// start a re-orgnization job
|
||||
// start a reorgnization job
|
||||
d.reOrgDoneCh = make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
d.reOrgDoneCh <- f()
|
||||
}()
|
||||
}
|
||||
|
||||
// wait re-orgnization job done or timeout
|
||||
// wait reorgnization job done or timeout
|
||||
select {
|
||||
case err := <-d.reOrgDoneCh:
|
||||
d.reOrgDoneCh = nil
|
||||
return errors.Trace(err)
|
||||
case <-time.After(waitReOrgTimeout):
|
||||
case <-time.After(waitReorgTimeout):
|
||||
// if timeout, we will return, check the owner and retry wait job done again.
|
||||
return errWaitReOrgTimeout
|
||||
return errWaitReorgTimeout
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@ -19,13 +19,12 @@ import (
|
||||
"github.com/pingcap/tidb/meta/autoid"
|
||||
"github.com/pingcap/tidb/model"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/reborndb/go/errors2"
|
||||
"github.com/pingcap/tidb/util/errors2"
|
||||
)
|
||||
|
||||
func (d *ddl) onSchemaCreate(t *meta.TMeta, job *model.Job) error {
|
||||
schemaID := job.SchemaID
|
||||
|
||||
name := model.CIStr{}
|
||||
var name model.CIStr
|
||||
if err := job.DecodeArgs(&name); err != nil {
|
||||
// arg error, cancel this job.
|
||||
job.State = model.JobCancelled
|
||||
@ -41,7 +40,7 @@ func (d *ddl) onSchemaCreate(t *meta.TMeta, job *model.Job) error {
|
||||
for _, db := range dbs {
|
||||
if db.Name.L == name.L {
|
||||
if db.ID != schemaID {
|
||||
// database exists, can't create, we can cancel this job now.
|
||||
// database exists, can't create, we should cancel this job now.
|
||||
job.State = model.JobCancelled
|
||||
return errors.Trace(ErrExists)
|
||||
}
|
||||
@ -94,7 +93,8 @@ func (d *ddl) onSchemaDrop(t *meta.TMeta, job *model.Job) error {
|
||||
dbInfo, err := t.GetDatabase(job.SchemaID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
} else if dbInfo == nil {
|
||||
}
|
||||
if dbInfo == nil {
|
||||
job.State = model.JobCancelled
|
||||
return errors.Trace(ErrNotExists)
|
||||
}
|
||||
@ -116,30 +116,31 @@ func (d *ddl) onSchemaDrop(t *meta.TMeta, job *model.Job) error {
|
||||
err = t.UpdateDatabase(dbInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateDeleteOnly:
|
||||
// delete only -> re orgnization
|
||||
dbInfo.State = model.StateReOrgnization
|
||||
// delete only -> reorgnization
|
||||
dbInfo.State = model.StateReorgnization
|
||||
err = t.UpdateDatabase(dbInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReOrgnization:
|
||||
// wait re-orgnization jobs done and drop meta.
|
||||
case model.StateReorgnization:
|
||||
// wait reorgnization jobs done and drop meta.
|
||||
var tables []*model.TableInfo
|
||||
tables, err = t.ListTables(dbInfo.ID)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
err = d.runReOrgJob(func() error {
|
||||
err = d.runReorgJob(func() error {
|
||||
return d.dropSchemaData(dbInfo, tables)
|
||||
})
|
||||
|
||||
if errors2.ErrorEqual(err, errWaitReOrgTimeout) {
|
||||
// if timeout, we will return, check the owner and retry wait job done again.
|
||||
if errors2.ErrorEqual(err, errWaitReorgTimeout) {
|
||||
// if timeout, we should return, check for the owner and re-wait job done.
|
||||
return nil
|
||||
} else if err != nil {
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// all re-orgnization jobs done, drop this database
|
||||
// all reorgnization jobs done, drop this database
|
||||
if err = t.DropDatabase(dbInfo.ID); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
@ -154,18 +155,15 @@ func (d *ddl) onSchemaDrop(t *meta.TMeta, job *model.Job) error {
|
||||
}
|
||||
|
||||
func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) error {
|
||||
ctx := d.newReOrgContext()
|
||||
defer ctx.FinishTxn(true)
|
||||
|
||||
ctx := d.newReorgContext()
|
||||
txn, err := ctx.GetTxn(true)
|
||||
|
||||
for _, tblInfo := range tables {
|
||||
|
||||
alloc := autoid.NewAllocator(d.meta, dbInfo.ID)
|
||||
t := table.TableFromMeta(dbInfo.Name.L, alloc, tblInfo)
|
||||
|
||||
err = t.Truncate(ctx)
|
||||
if err != nil {
|
||||
ctx.FinishTxn(true)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -173,6 +171,7 @@ func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) er
|
||||
for _, v := range t.Indices() {
|
||||
if v != nil && v.X != nil {
|
||||
if err = v.X.Drop(txn); err != nil {
|
||||
ctx.FinishTxn(true)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -30,8 +30,8 @@ const (
|
||||
// StateWriteOnly means we can use any write operation on this schema element,
|
||||
// but outer can't read the changed data.
|
||||
StateWriteOnly
|
||||
// StateReOrgnization meas we are re-orgnizating whole data for this shema changed.
|
||||
StateReOrgnization
|
||||
// StateReorgnization meas we are re-orgnizating whole data for this shema changed.
|
||||
StateReorgnization
|
||||
// StatePublic means this schema element is ok for all write and read operations.
|
||||
StatePublic
|
||||
)
|
||||
@ -43,7 +43,7 @@ func (s SchemaState) String() string {
|
||||
return "delete only"
|
||||
case StateWriteOnly:
|
||||
return "write only"
|
||||
case StateReOrgnization:
|
||||
case StateReorgnization:
|
||||
return "reorgnization"
|
||||
case StatePublic:
|
||||
return "public"
|
||||
|
||||
Reference in New Issue
Block a user