Merge remote-tracking branch 'origin/siddontang/dev-schema-change' into qiuyesuifeng/schema-change-add-column

This commit is contained in:
qiuyesuifeng
2015-11-04 12:48:44 +08:00
9 changed files with 111 additions and 58 deletions

View File

@ -52,7 +52,7 @@ func (d *ddl) addColumn(tblInfo *model.TableInfo, spec *AlterSpecification) (*mo
if spec.Position.Type == ColumnPositionFirst {
position = 0
} else if spec.Position.Type == ColumnPositionAfter {
c := findCol(tblInfo.Columns, spec.Position.RelativeColumn)
c := findCol(cols, spec.Position.RelativeColumn)
if c == nil {
return nil, 0, errors.Errorf("No such column: %v", spec.Column.Name)
}

View File

@ -74,6 +74,10 @@ type ddl struct {
// use a persistent job list.
reOrgDoneCh chan error
// reOrgHandle is used for adding data reorgnization, after every batch,
// we will update for later job update.
reOrgHandle int64
quitCh chan struct{}
wait sync.WaitGroup
}

View File

@ -49,10 +49,10 @@ func (ts *testSuite) SetUpSuite(c *C) {
ts.store = store
}
func (ts *testSuite) TestT(c *C) {
func (ts *testSuite) TestDDL(c *C) {
se, _ := tidb.CreateSession(ts.store)
ctx := se.(context.Context)
schemaName := model.NewCIStr("test")
schemaName := model.NewCIStr("test_ddl")
tblName := model.NewCIStr("t")
tbIdent := table.Ident{
Schema: schemaName,
@ -74,27 +74,6 @@ func (ts *testSuite) TestT(c *C) {
err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints)
c.Assert(errors2.ErrorEqual(err, ddl.ErrExists), IsTrue)
// Test index.
idxStmt := statement("CREATE INDEX idx_c ON t (c)").(*stmts.CreateIndexStmt)
idxName := model.NewCIStr(idxStmt.IndexName)
err = sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, tbIdent, idxStmt.Unique, idxName, idxStmt.IndexColNames)
c.Assert(err, IsNil)
tbs := sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema)
c.Assert(tbs, HasLen, 1)
err = sessionctx.GetDomain(ctx).DDL().DropIndex(ctx, tbIdent, idxName)
c.Assert(err, IsNil)
err = sessionctx.GetDomain(ctx).DDL().DropTable(ctx, tbIdent)
c.Assert(err, IsNil)
tbs = sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema)
c.Assert(tbs, HasLen, 0)
err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, noExist)
c.Assert(errors2.ErrorEqual(err, ddl.ErrNotExists), IsTrue)
err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, tbIdent.Schema)
c.Assert(err, IsNil)
// Test column.
c.Skip("Not completely support column ddl")
tbIdent2 := tbIdent
tbIdent2.Name = model.NewCIStr("t2")
tbStmt = statement("create table t2 (a int unique not null)").(*stmts.CreateTableStmt)
@ -110,6 +89,9 @@ func (ts *testSuite) TestT(c *C) {
alterStmt := statement(`alter table t2 add b enum("bb") first`).(*stmts.AlterTableStmt)
sessionctx.GetDomain(ctx).DDL().AlterTable(ctx, tbIdent2, alterStmt.Specs)
c.Assert(alterStmt.Specs[0].String(), Not(Equals), "")
tb, err = sessionctx.GetDomain(ctx).InfoSchema().TableByName(tbIdent2.Schema, tbIdent2.Name)
c.Assert(err, IsNil)
c.Assert(tb, NotNil)
cols, err := tb.Row(ctx, rid0)
c.Assert(err, IsNil)
c.Assert(len(cols), Equals, 2)
@ -168,20 +150,41 @@ func (ts *testSuite) TestT(c *C) {
alterStmt = statement("alter table t add column bb int after b").(*stmts.AlterTableStmt)
err = sessionctx.GetDomain(ctx).DDL().AlterTable(ctx, tbIdent, alterStmt.Specs)
c.Assert(err, NotNil)
idxStmt := statement("CREATE INDEX idx_c ON t (c)").(*stmts.CreateIndexStmt)
idxName := model.NewCIStr(idxStmt.IndexName)
err = sessionctx.GetDomain(ctx).DDL().CreateIndex(ctx, tbIdent, idxStmt.Unique, idxName, idxStmt.IndexColNames)
c.Assert(err, IsNil)
tbs := sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema)
c.Assert(len(tbs), Equals, 2)
err = sessionctx.GetDomain(ctx).DDL().DropIndex(ctx, tbIdent, idxName)
c.Assert(err, IsNil)
err = sessionctx.GetDomain(ctx).DDL().DropTable(ctx, tbIdent)
c.Assert(err, IsNil)
tbs = sessionctx.GetDomain(ctx).InfoSchema().SchemaTables(tbIdent.Schema)
c.Assert(len(tbs), Equals, 1)
err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, noExist)
c.Assert(errors2.ErrorEqual(err, ddl.ErrNotExists), IsTrue)
err = sessionctx.GetDomain(ctx).DDL().DropSchema(ctx, tbIdent.Schema)
c.Assert(err, IsNil)
}
func (ts *testSuite) TestConstraintNames(c *C) {
se, _ := tidb.CreateSession(ts.store)
ctx := se.(context.Context)
schemaName := model.NewCIStr("test")
schemaName := model.NewCIStr("test_constraint")
tblName := model.NewCIStr("t")
tbIdent := table.Ident{
Schema: schemaName,
Name: tblName,
}
err := sessionctx.GetDomain(ctx).DDL().CreateSchema(ctx, tbIdent.Schema)
c.Assert(err, IsNil)
tbStmt := statement("create table t (a int, b int, index a (a, b), index a (a))").(*stmts.CreateTableStmt)
err := sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints)
err = sessionctx.GetDomain(ctx).DDL().CreateTable(ctx, tbIdent, tbStmt.Cols, tbStmt.Constraints)
c.Assert(err, NotNil)
tbStmt = statement("create table t (a int, b int, index A (a, b), index (a))").(*stmts.CreateTableStmt)

View File

@ -15,6 +15,7 @@ package ddl
import (
"bytes"
"sync/atomic"
"github.com/juju/errors"
"github.com/ngaut/log"
@ -159,6 +160,9 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
indexInfo.State = model.StateReorgnization
// initialize SnapshotVer to 0 for later reorgnization check.
job.SnapshotVer = 0
// initialize reorg handle to 0
job.ReOrgHandle = 0
atomic.StoreInt64(&d.reOrgHandle, 0)
err = t.UpdateTable(schemaID, tblInfo)
return errors.Trace(err)
case model.StateReorgnization:
@ -181,9 +185,13 @@ func (d *ddl) onIndexCreate(t *meta.Meta, job *model.Job) error {
}
err = d.runReorgJob(func() error {
return d.addTableIndex(tbl, indexInfo, job.SnapshotVer)
return d.addTableIndex(tbl, indexInfo, job.SnapshotVer, job.ReOrgHandle)
})
// addTableIndex updates ReOrgHandle after one batch.
// so we update the job ReOrgHandle here.
job.ReOrgHandle = atomic.LoadInt64(&d.reOrgHandle)
if errors2.ErrorEqual(err, errWaitReorgTimeout) {
// if timeout, we should return, check for the owner and re-wait job done.
return nil
@ -343,8 +351,7 @@ const maxBatchSize = 1024
// 3, For one row, if the row has been already deleted, skip to next row.
// 4, If not deleted, check whether index has existed, if existed, skip to next row.
// 5, If index doesn't exist, create the index and then continue to handle next row.
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64) error {
seekHandle := int64(0)
func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version uint64, seekHandle int64) error {
for {
handles, err := d.getSnapshotRows(t, version, seekHandle)
if err != nil {
@ -354,12 +361,14 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
}
seekHandle = handles[len(handles)-1] + 1
// TODO: save seekHandle in reorgnization job, so we can resume this job later from this handle.
err = d.backfillTableIndex(t, indexInfo, handles)
if err != nil {
return errors.Trace(err)
}
// update reOrgHandle here after every successful batch.
atomic.StoreInt64(&d.reOrgHandle, seekHandle)
}
}
@ -418,7 +427,7 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique)
for _, handle := range handles {
log.Error("building index...", handle, handles)
log.Debug("building index...", handle)
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
// first check row exists
@ -465,18 +474,6 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
}
func (d *ddl) dropTableIndex(t table.Table, indexInfo *model.IndexInfo) error {
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
// Remove indices.
for _, v := range t.Indices() {
if v != nil && v.X != nil && v.Name.L == indexInfo.Name.L {
if err := v.X.Drop(txn); err != nil {
return errors.Trace(err)
}
}
}
return nil
})
err := d.delKeysWithPrefix(kv.GenIndexPrefix(t.IndexPrefix(), indexInfo.Name.L))
return errors.Trace(err)
}

View File

@ -15,6 +15,7 @@ package ddl
import (
"fmt"
"strings"
"time"
"github.com/juju/errors"
@ -97,7 +98,6 @@ const waitReorgTimeout = 10 * time.Second
var errWaitReorgTimeout = errors.New("wait for reorgnization timeout")
func (d *ddl) runReorgJob(f func() error) error {
// TODO use persistent reorgnization job list.
if d.reOrgDoneCh == nil {
// start a reorgnization job
d.reOrgDoneCh = make(chan error, 1)
@ -120,5 +120,49 @@ func (d *ddl) runReorgJob(f func() error) error {
// we return errWaitReorgTimeout here too, so that outer loop will break.
return errWaitReorgTimeout
}
}
func (d *ddl) delKeysWithPrefix(prefix string) error {
keys := make([]string, maxBatchSize)
for {
keys := keys[0:0]
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
iter, err := txn.Seek([]byte(prefix))
if err != nil {
return errors.Trace(err)
}
defer iter.Close()
for i := 0; i < maxBatchSize; i++ {
if iter.Valid() && strings.HasPrefix(iter.Key(), prefix) {
keys = append(keys, iter.Key())
err = iter.Next()
if err != nil {
return errors.Trace(err)
}
} else {
break
}
}
for _, key := range keys {
err := txn.Delete([]byte(key))
if err != nil {
return errors.Trace(err)
}
}
return nil
})
if err != nil {
return errors.Trace(err)
}
// delete no keys, return.
if len(keys) == 0 {
return nil
}
}
}

View File

@ -160,16 +160,13 @@ func (d *ddl) onSchemaDrop(t *meta.Meta, job *model.Job) error {
}
func (d *ddl) dropSchemaData(dbInfo *model.DBInfo, tables []*model.TableInfo) error {
ctx := d.newReorgContext()
for _, tblInfo := range tables {
alloc := autoid.NewAllocator(d.store, dbInfo.ID)
t := table.TableFromMeta(alloc, tblInfo)
err := t.Truncate(ctx)
err := d.dropTableData(t)
if err != nil {
ctx.FinishTxn(true)
return errors.Trace(err)
}
}
return errors.Trace(ctx.FinishTxn(false))
return nil
}

View File

@ -193,12 +193,14 @@ func (d *ddl) getTableInfo(t *meta.Meta, job *model.Job) (*model.TableInfo, erro
}
func (d *ddl) dropTableData(t table.Table) error {
ctx := d.newReorgContext()
if err := t.Truncate(ctx); err != nil {
ctx.FinishTxn(true)
// delete table data
err := d.delKeysWithPrefix(t.KeyPrefix())
if err != nil {
return errors.Trace(err)
}
return errors.Trace(ctx.FinishTxn(false))
// delete table index
err = d.delKeysWithPrefix(t.IndexPrefix())
return errors.Trace(err)
}

View File

@ -101,7 +101,8 @@ type kvIndex struct {
prefix string
}
func genIndexPrefix(indexPrefix, indexName string) string {
// GenIndexPrefix generates the index prefix.
func GenIndexPrefix(indexPrefix, indexName string) string {
// Use EncodeBytes to guarantee generating different index prefix.
// e.g, two indices c1 and c with index prefix p, if no EncodeBytes,
// the index format looks p_c and p_c1, if c has an index value which the first encoded byte is '1',
@ -116,7 +117,7 @@ func NewKVIndex(indexPrefix, indexName string, unique bool) Index {
return &kvIndex{
indexName: indexName,
unique: unique,
prefix: genIndexPrefix(indexPrefix, indexName),
prefix: GenIndexPrefix(indexPrefix, indexName),
}
}

View File

@ -80,6 +80,11 @@ type Job struct {
// snapshot version for this job.
SnapshotVer uint64 `json:"snapshot_ver"`
LastUpdateTS int64 `json:"last_update_ts"`
// For reorgnization adding data job like add index, add column,
// we will traverse the huge snapshot and add the data in batches.
// After some successful batches, we will update the ReorgHandle for
// later resuming if currect server crashed.
ReOrgHandle int64 `json:"reorg_handle"`
}
// Encode encodes job with json format.