*: Merge branch 'siddontang/dev-schema-change' into siddontang/update-reorg

This commit is contained in:
siddontang
2015-11-10 11:12:09 +08:00
4 changed files with 130 additions and 88 deletions

View File

@ -21,8 +21,8 @@ type Callback interface {
OnChanged(err error) error
// OnJobRunBefore is called before running job.
OnJobRunBefore(job *model.Job)
// OnJobRunAfter is called after job is run.
OnJobRunAfter(job *model.Job)
// OnJobUpdated is called after the running job is updated.
OnJobUpdated(job *model.Job)
}
// BaseCallback implements Callback.OnChanged interface.
@ -39,7 +39,7 @@ func (c *BaseCallback) OnJobRunBefore(job *model.Job) {
// Nothing to do.
}
// OnJobRunAfter implements Callback.OnJobRunAfter interface.
func (c *BaseCallback) OnJobRunAfter(job *model.Job) {
// OnJobUpdated implements Callback.OnJobUpdated interface.
func (c *BaseCallback) OnJobUpdated(job *model.Job) {
// Nothing to do.
}

23
ddl/callback_test.go Normal file
View File

@ -0,0 +1,23 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import . "github.com/pingcap/check"
func (s *testDDLSuite) TestCallback(c *C) {
cb := &BaseCallback{}
c.Assert(cb.OnChanged(nil), IsNil)
cb.OnJobRunBefore(nil)
cb.OnJobUpdated(nil)
}

View File

@ -30,6 +30,16 @@ import (
var _ = Suite(&testColumnSuite{})
type testDDLCallback struct {
*BaseCallback
onJobUpdated func(*model.Job)
}
func (tc *testDDLCallback) OnJobUpdated(job *model.Job) {
tc.onJobUpdated(job)
}
type testColumnSuite struct {
store kv.Storage
dbInfo *model.DBInfo
@ -49,6 +59,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) {
func (s *testColumnSuite) TearDownSuite(c *C) {
testDropSchema(c, mock.NewContext(), s.d, s.dbInfo)
s.d.close()
err := s.store.Close()
c.Assert(err, IsNil)
}
@ -324,8 +335,11 @@ func (s *testIndexSuite) checkDeleteOnlyColumn(c *C, ctx context.Context, d *ddl
func (s *testIndexSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
_, err := ctx.GetTxn(true)
c.Assert(err, IsNil)
i := int64(0)
err := t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) {
err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) {
c.Assert(data, DeepEquals, row)
i++
return true, nil
@ -381,9 +395,12 @@ func (s *testIndexSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl,
func (s *testIndexSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}) {
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
_, err := ctx.GetTxn(true)
c.Assert(err, IsNil)
i := int64(0)
oldRow := append(row, columnValue)
err := t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) {
err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) {
c.Assert(data, DeepEquals, oldRow)
i++
return true, nil
@ -434,7 +451,9 @@ func (s *testIndexSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tb
c.Assert(err, IsNil)
}
func (s *testIndexSuite) checkAddOrDropColumn(c *C, state model.SchemaState, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) {
func (s *testIndexSuite) checkAddOrDropColumn(c *C, state model.SchemaState, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) {
ctx := testNewContext(c, d)
switch state {
case model.StateNone:
s.checkNoneColumn(c, ctx, d, tblInfo, handle, col, columnValue)
@ -461,17 +480,13 @@ func testGetColumn(t *tables.Table, name string) *column.Col {
func (s *testIndexSuite) TestAddColumn(c *C) {
d := newDDL(s.store, nil, nil, 100*time.Millisecond)
defer d.close()
tblInfo := testTableInfo(c, d, "t", 3)
ctx := testNewContext(c, d)
defer ctx.FinishTxn(true)
_, err := ctx.GetTxn(true)
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
defer testDropTable(c, ctx, d, s.dbInfo, tblInfo)
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
@ -479,67 +494,65 @@ func (s *testIndexSuite) TestAddColumn(c *C) {
handle, err := t.AddRecord(ctx, row)
c.Assert(err, IsNil)
ticker := time.NewTicker(d.lease)
done := make(chan *model.Job, 1)
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
colName := "c4"
defaultColValue := int64(4)
checkOK := false
tc := &testDDLCallback{}
tc.onJobUpdated = func(job *model.Job) {
if checkOK {
return
}
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table)
col := testGetColumn(t, colName)
if col == nil {
return
}
s.checkAddOrDropColumn(c, col.State, d, tblInfo, handle, col, row, defaultColValue, false)
if col.State == model.StatePublic {
checkOK = true
}
}
d.hook = tc
// Use local ddl for callback test.
s.d.close()
d.close()
d.start()
job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, colName, &ColumnPosition{Type: ColumnPositionNone}, defaultColValue)
testCheckJobDone(c, d, job, true)
_, err = ctx.GetTxn(true)
c.Assert(err, IsNil)
go func() {
done <- testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, &ColumnPosition{Type: ColumnPositionNone}, defaultColValue)
}()
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)
lastCheckState := model.StateNone
col := &column.Col{}
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
for {
select {
case job := <-done:
testCheckJobDone(c, d, job, true)
s.checkAddOrDropColumn(c, model.StatePublic, ctx, d, tblInfo, handle, col, row, defaultColValue, false)
return
case <-ticker.C:
d.close()
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table)
col = testGetColumn(t, colName)
if col == nil {
continue
}
// Here means column state is not changed, so just skipped.
if lastCheckState == col.State {
continue
}
s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, false)
lastCheckState = col.State
d.start()
}
}
d.close()
s.d.start()
}
func (s *testIndexSuite) TestDropColumn(c *C) {
d := newDDL(s.store, nil, nil, 100*time.Millisecond)
defer d.close()
tblInfo := testTableInfo(c, d, "t", 4)
ctx := testNewContext(c, d)
defer ctx.FinishTxn(true)
_, err := ctx.GetTxn(true)
c.Assert(err, IsNil)
testCreateTable(c, ctx, d, s.dbInfo, tblInfo)
defer testDropTable(c, ctx, d, s.dbInfo, tblInfo)
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
@ -549,44 +562,50 @@ func (s *testIndexSuite) TestDropColumn(c *C) {
handle, err := t.AddRecord(ctx, append(row, defaultColValue))
c.Assert(err, IsNil)
ticker := time.NewTicker(d.lease)
done := make(chan *model.Job, 1)
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
checkOK := false
oldCol := &column.Col{}
tc := &testDDLCallback{}
tc.onJobUpdated = func(job *model.Job) {
if checkOK {
return
}
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table)
col := testGetColumn(t, colName)
if col == nil {
s.checkAddOrDropColumn(c, model.StateNone, d, tblInfo, handle, oldCol, row, defaultColValue, true)
checkOK = true
return
}
s.checkAddOrDropColumn(c, col.State, d, tblInfo, handle, col, row, defaultColValue, true)
oldCol = col
}
d.hook = tc
// Use local ddl for callback test.
s.d.close()
d.close()
d.start()
job := testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false)
testCheckJobDone(c, d, job, false)
_, err = ctx.GetTxn(true)
c.Assert(err, IsNil)
job = testDropTable(c, ctx, d, s.dbInfo, tblInfo)
testCheckJobDone(c, d, job, false)
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
go func() {
done <- testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false)
}()
lastCheckState := model.StateNone
col := &column.Col{}
for {
select {
case job := <-done:
testCheckJobDone(c, d, job, false)
s.checkAddOrDropColumn(c, model.StateNone, ctx, d, tblInfo, handle, col, row, defaultColValue, false)
return
case <-ticker.C:
d.close()
t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table)
col = testGetColumn(t, colName)
if col == nil {
return
}
// Here means column state is not changed, so just skipped.
if lastCheckState == col.State {
continue
}
s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, true)
lastCheckState = col.State
d.start()
}
}
d.close()
s.d.start()
}

View File

@ -203,8 +203,6 @@ func (d *ddl) handleJobQueue() error {
// and retry later if the job is not cancelled.
d.runJob(t, job)
d.hook.OnJobRunAfter(job)
if job.State == model.JobDone || job.State == model.JobCancelled {
err = d.finishJob(t, job)
if err == nil {
@ -234,6 +232,8 @@ func (d *ddl) handleJobQueue() error {
return nil
}
d.hook.OnJobUpdated(job)
// here means the job enters another state (delete only, write only, public, etc...) or is cancelled.
// if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update
// the newest schema.