Merge remote-tracking branch 'origin/siddontang/dev-schema-change' into qiuyesuifeng/schema-change-add-column
This commit is contained in:
10
ddl/ddl.go
10
ddl/ddl.go
@ -68,15 +68,15 @@ type ddl struct {
|
||||
uuid string
|
||||
jobCh chan struct{}
|
||||
jobDoneCh chan struct{}
|
||||
// reOrgDoneCh is for reorgnization, if the reorgnization job is done,
|
||||
// reorgDoneCh is for reorganization, if the reorganization job is done,
|
||||
// we will use this channel to notify outer.
|
||||
// TODO: now we use goroutine to simulate reorgnization jobs, later we may
|
||||
// TODO: now we use goroutine to simulate reorganization jobs, later we may
|
||||
// use a persistent job list.
|
||||
reOrgDoneCh chan error
|
||||
reorgDoneCh chan error
|
||||
|
||||
// reOrgHandle is used for adding data reorgnization, after every batch,
|
||||
// reorgHandle is used for adding data reorganization, after every batch,
|
||||
// we will update for later job update.
|
||||
reOrgHandle int64
|
||||
reorgHandle int64
|
||||
|
||||
quitCh chan struct{}
|
||||
wait sync.WaitGroup
|
||||
|
||||
36
ddl/index.go
36
ddl/index.go
@ -156,18 +156,18 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
case model.StateWriteOnly:
|
||||
// write only -> reorganization
|
||||
job.SchemaState = model.StateReorgnization
|
||||
indexInfo.State = model.StateReorgnization
|
||||
// initialize SnapshotVer to 0 for later reorgnization check.
|
||||
job.SchemaState = model.StateReorganization
|
||||
indexInfo.State = model.StateReorganization
|
||||
// initialize SnapshotVer to 0 for later reorganization check.
|
||||
job.SnapshotVer = 0
|
||||
// initialize reorg handle to 0
|
||||
job.ReOrgHandle = 0
|
||||
atomic.StoreInt64(&d.reOrgHandle, 0)
|
||||
job.ReorgHandle = 0
|
||||
atomic.StoreInt64(&d.reorgHandle, 0)
|
||||
err = t.UpdateTable(schemaID, tblInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReorgnization:
|
||||
case model.StateReorganization:
|
||||
// reorganization -> public
|
||||
// get the current version for reorgnization if we don't have
|
||||
// get the current version for reorganization if we don't have
|
||||
if job.SnapshotVer == 0 {
|
||||
var ver kv.Version
|
||||
ver, err = d.store.CurrentVersion()
|
||||
@ -185,12 +185,12 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
|
||||
}
|
||||
|
||||
err = d.runReorgJob(func() error {
|
||||
return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReOrgHandle)
|
||||
return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReorgHandle)
|
||||
})
|
||||
|
||||
// addTableIndex updates ReOrgHandle after one batch.
|
||||
// so we update the job ReOrgHandle here.
|
||||
job.ReOrgHandle = atomic.LoadInt64(&d.reOrgHandle)
|
||||
// addTableIndex updates ReorgHandle after one batch.
|
||||
// so we update the job ReorgHandle here.
|
||||
job.ReorgHandle = atomic.LoadInt64(&d.reorgHandle)
|
||||
|
||||
if errors2.ErrorEqual(err, errWaitReorgTimeout) {
|
||||
// if timeout, we should return, check for the owner and re-wait job done.
|
||||
@ -261,11 +261,11 @@ func (d *ddl) onIndexDrop(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
case model.StateDeleteOnly:
|
||||
// delete only -> reorganization
|
||||
job.SchemaState = model.StateReorgnization
|
||||
indexInfo.State = model.StateReorgnization
|
||||
job.SchemaState = model.StateReorganization
|
||||
indexInfo.State = model.StateReorganization
|
||||
err = t.UpdateTable(schemaID, tblInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReorgnization:
|
||||
case model.StateReorganization:
|
||||
// reorganization -> absent
|
||||
tbl, err := d.getTable(t, schemaID, tblInfo)
|
||||
if err != nil {
|
||||
@ -284,7 +284,7 @@ func (d *ddl) onIndexDrop(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// all reorgnization jobs done, drop this index
|
||||
// all reorganization jobs done, drop this index
|
||||
newIndices := make([]*model.IndexInfo, 0, len(tblInfo.Indices))
|
||||
for _, idx := range tblInfo.Indices {
|
||||
if idx.Name.L != indexName.L {
|
||||
@ -345,7 +345,7 @@ func fetchRowColVals(txn kv.Transaction, t table.Table, handle int64, indexInfo
|
||||
|
||||
const maxBatchSize = 1024
|
||||
|
||||
// How to add index in reorgnization state?
|
||||
// How to add index in reorganization state?
|
||||
// 1, Generate a snapshot with special version.
|
||||
// 2, Traverse the snapshot, get every row in the table.
|
||||
// 3, For one row, if the row has been already deleted, skip to next row.
|
||||
@ -367,8 +367,8 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// update reOrgHandle here after every successful batch.
|
||||
atomic.StoreInt64(&d.reOrgHandle, seekHandle)
|
||||
// update reorgHandle here after every successful batch.
|
||||
atomic.StoreInt64(&d.reorgHandle, seekHandle)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
34
ddl/reorg.go
34
ddl/reorg.go
@ -23,16 +23,16 @@ import (
|
||||
"github.com/pingcap/tidb/kv"
|
||||
)
|
||||
|
||||
var _ context.Context = &reOrgContext{}
|
||||
var _ context.Context = &reorgContext{}
|
||||
|
||||
// reOrgContext implements context.Context interface for reorgnization use.
|
||||
type reOrgContext struct {
|
||||
// reorgContext implements context.Context interface for reorganization use.
|
||||
type reorgContext struct {
|
||||
store kv.Storage
|
||||
m map[fmt.Stringer]interface{}
|
||||
txn kv.Transaction
|
||||
}
|
||||
|
||||
func (c *reOrgContext) GetTxn(forceNew bool) (kv.Transaction, error) {
|
||||
func (c *reorgContext) GetTxn(forceNew bool) (kv.Transaction, error) {
|
||||
if forceNew {
|
||||
if c.txn != nil {
|
||||
if err := c.txn.Commit(); err != nil {
|
||||
@ -55,7 +55,7 @@ func (c *reOrgContext) GetTxn(forceNew bool) (kv.Transaction, error) {
|
||||
return c.txn, nil
|
||||
}
|
||||
|
||||
func (c *reOrgContext) FinishTxn(rollback bool) error {
|
||||
func (c *reorgContext) FinishTxn(rollback bool) error {
|
||||
if c.txn == nil {
|
||||
return nil
|
||||
}
|
||||
@ -72,20 +72,20 @@ func (c *reOrgContext) FinishTxn(rollback bool) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (c *reOrgContext) SetValue(key fmt.Stringer, value interface{}) {
|
||||
func (c *reorgContext) SetValue(key fmt.Stringer, value interface{}) {
|
||||
c.m[key] = value
|
||||
}
|
||||
|
||||
func (c *reOrgContext) Value(key fmt.Stringer) interface{} {
|
||||
func (c *reorgContext) Value(key fmt.Stringer) interface{} {
|
||||
return c.m[key]
|
||||
}
|
||||
|
||||
func (c *reOrgContext) ClearValue(key fmt.Stringer) {
|
||||
func (c *reorgContext) ClearValue(key fmt.Stringer) {
|
||||
delete(c.m, key)
|
||||
}
|
||||
|
||||
func (d *ddl) newReorgContext() context.Context {
|
||||
c := &reOrgContext{
|
||||
c := &reorgContext{
|
||||
store: d.store,
|
||||
m: make(map[fmt.Stringer]interface{}),
|
||||
}
|
||||
@ -95,23 +95,23 @@ func (d *ddl) newReorgContext() context.Context {
|
||||
|
||||
const waitReorgTimeout = 10 * time.Second
|
||||
|
||||
var errWaitReorgTimeout = errors.New("wait for reorgnization timeout")
|
||||
var errWaitReorgTimeout = errors.New("wait for reorganization timeout")
|
||||
|
||||
func (d *ddl) runReorgJob(f func() error) error {
|
||||
if d.reOrgDoneCh == nil {
|
||||
// start a reorgnization job
|
||||
d.reOrgDoneCh = make(chan error, 1)
|
||||
if d.reorgDoneCh == nil {
|
||||
// start a reorganization job
|
||||
d.reorgDoneCh = make(chan error, 1)
|
||||
go func() {
|
||||
d.reOrgDoneCh <- f()
|
||||
d.reorgDoneCh <- f()
|
||||
}()
|
||||
}
|
||||
|
||||
waitTimeout := chooseLeaseTime(d.lease, waitReorgTimeout)
|
||||
|
||||
// wait reorgnization job done or timeout
|
||||
// wait reorganization job done or timeout
|
||||
select {
|
||||
case err := <-d.reOrgDoneCh:
|
||||
d.reOrgDoneCh = nil
|
||||
case err := <-d.reorgDoneCh:
|
||||
d.reorgDoneCh = nil
|
||||
return errors.Trace(err)
|
||||
case <-time.After(waitTimeout):
|
||||
// if timeout, we will return, check the owner and retry wait job done again.
|
||||
|
||||
@ -119,13 +119,13 @@ func (d *ddl) onSchemaDrop(t *meta.Meta, job *model.Job) error {
|
||||
err = t.UpdateDatabase(dbInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateDeleteOnly:
|
||||
// delete only -> reorgnization
|
||||
job.SchemaState = model.StateReorgnization
|
||||
dbInfo.State = model.StateReorgnization
|
||||
// delete only -> reorganization
|
||||
job.SchemaState = model.StateReorganization
|
||||
dbInfo.State = model.StateReorganization
|
||||
err = t.UpdateDatabase(dbInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReorgnization:
|
||||
// wait reorgnization jobs done and drop meta.
|
||||
case model.StateReorganization:
|
||||
// wait reorganization jobs done and drop meta.
|
||||
var tables []*model.TableInfo
|
||||
tables, err = t.ListTables(dbInfo.ID)
|
||||
if err != nil {
|
||||
@ -144,7 +144,7 @@ func (d *ddl) onSchemaDrop(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// all reorgnization jobs done, drop this database
|
||||
// all reorganization jobs done, drop this database
|
||||
if err = t.DropDatabase(dbInfo.ID); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -126,11 +126,11 @@ func (d *ddl) onTableDrop(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
case model.StateDeleteOnly:
|
||||
// delete only -> reorganization
|
||||
job.SchemaState = model.StateReorgnization
|
||||
tblInfo.State = model.StateReorgnization
|
||||
job.SchemaState = model.StateReorganization
|
||||
tblInfo.State = model.StateReorganization
|
||||
err = t.UpdateTable(schemaID, tblInfo)
|
||||
return errors.Trace(err)
|
||||
case model.StateReorgnization:
|
||||
case model.StateReorganization:
|
||||
// reorganization -> absent
|
||||
var tbl table.Table
|
||||
tbl, err = d.getTable(t, schemaID, tblInfo)
|
||||
@ -150,7 +150,7 @@ func (d *ddl) onTableDrop(t *meta.Meta, job *model.Job) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// all reorgnization jobs done, drop this database
|
||||
// all reorganization jobs done, drop this database
|
||||
if err = t.DropTable(schemaID, tableID); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
@ -57,7 +57,7 @@ func (d *ddl) startJob(ctx context.Context, job *model.Job) error {
|
||||
|
||||
var historyJob *model.Job
|
||||
|
||||
// for a job from start to end, the state of it will be none -> delete only -> write only -> reorgnization -> public
|
||||
// for a job from start to end, the state of it will be none -> delete only -> write only -> reorganization -> public
|
||||
// for every state change, we will wait as lease 2 * lease time, so here the ticker check is 10 * lease.
|
||||
ticker := time.NewTicker(chooseLeaseTime(10*d.lease, 10*time.Second))
|
||||
defer ticker.Stop()
|
||||
|
||||
@ -80,11 +80,11 @@ type Job struct {
|
||||
// snapshot version for this job.
|
||||
SnapshotVer uint64 `json:"snapshot_ver"`
|
||||
LastUpdateTS int64 `json:"last_update_ts"`
|
||||
// For reorgnization adding data job like add index, add column,
|
||||
// For reorganization adding data job like add index, add column,
|
||||
// we will traverse the huge snapshot and add the data in batches.
|
||||
// After some successful batches, we will update the ReorgHandle for
|
||||
// later resuming if currect server crashed.
|
||||
ReOrgHandle int64 `json:"reorg_handle"`
|
||||
ReorgHandle int64 `json:"reorg_handle"`
|
||||
}
|
||||
|
||||
// Encode encodes job with json format.
|
||||
|
||||
@ -30,8 +30,8 @@ const (
|
||||
// StateWriteOnly means we can use any write operation on this schema element,
|
||||
// but outer can't read the changed data.
|
||||
StateWriteOnly
|
||||
// StateReorgnization meas we are re-orgnizating whole data for this shema changed.
|
||||
StateReorgnization
|
||||
// StateReorganization means we are re-organizating whole data for this shema changed.
|
||||
StateReorganization
|
||||
// StatePublic means this schema element is ok for all write and read operations.
|
||||
StatePublic
|
||||
)
|
||||
@ -43,8 +43,8 @@ func (s SchemaState) String() string {
|
||||
return "delete only"
|
||||
case StateWriteOnly:
|
||||
return "write only"
|
||||
case StateReorgnization:
|
||||
return "reorgnization"
|
||||
case StateReorganization:
|
||||
return "reorganization"
|
||||
case StatePublic:
|
||||
return "public"
|
||||
default:
|
||||
|
||||
Reference in New Issue
Block a user