diff --git a/ddl/callback.go b/ddl/callback.go index e68467cdab..4c064de2fc 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -21,8 +21,8 @@ type Callback interface { OnChanged(err error) error // OnJobRunBefore is called before running job. OnJobRunBefore(job *model.Job) - // OnJobRunAfter is called after job is run. - OnJobRunAfter(job *model.Job) + // OnJobUpdated is called after the running job is updated. + OnJobUpdated(job *model.Job) } // BaseCallback implements Callback.OnChanged interface. @@ -39,7 +39,7 @@ func (c *BaseCallback) OnJobRunBefore(job *model.Job) { // Nothing to do. } -// OnJobRunAfter implements Callback.OnJobRunAfter interface. -func (c *BaseCallback) OnJobRunAfter(job *model.Job) { +// OnJobUpdated implements Callback.OnJobUpdated interface. +func (c *BaseCallback) OnJobUpdated(job *model.Job) { // Nothing to do. } diff --git a/ddl/callback_test.go b/ddl/callback_test.go new file mode 100644 index 0000000000..0b43d2a21c --- /dev/null +++ b/ddl/callback_test.go @@ -0,0 +1,23 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import . "github.com/pingcap/check" + +func (s *testDDLSuite) TestCallback(c *C) { + cb := &BaseCallback{} + c.Assert(cb.OnChanged(nil), IsNil) + cb.OnJobRunBefore(nil) + cb.OnJobUpdated(nil) +} diff --git a/ddl/column_test.go b/ddl/column_test.go index 3fd3a85a2c..a5dd0b8e72 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -30,6 +30,16 @@ import ( var _ = Suite(&testColumnSuite{}) +type testDDLCallback struct { + *BaseCallback + + onJobUpdated func(*model.Job) +} + +func (tc *testDDLCallback) OnJobUpdated(job *model.Job) { + tc.onJobUpdated(job) +} + type testColumnSuite struct { store kv.Storage dbInfo *model.DBInfo @@ -49,6 +59,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) { func (s *testColumnSuite) TearDownSuite(c *C) { testDropSchema(c, mock.NewContext(), s.d, s.dbInfo) s.d.close() + err := s.store.Close() c.Assert(err, IsNil) } @@ -324,8 +335,11 @@ func (s *testIndexSuite) checkDeleteOnlyColumn(c *C, ctx context.Context, d *ddl func (s *testIndexSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) { t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + _, err := ctx.GetTxn(true) + c.Assert(err, IsNil) + i := int64(0) - err := t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { + err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { c.Assert(data, DeepEquals, row) i++ return true, nil @@ -381,9 +395,12 @@ func (s *testIndexSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl, func (s *testIndexSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}) { t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + _, err := ctx.GetTxn(true) + c.Assert(err, IsNil) + i := int64(0) oldRow := append(row, columnValue) - err := t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { + err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { c.Assert(data, DeepEquals, oldRow) i++ return true, nil @@ -434,7 +451,9 @@ func (s *testIndexSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tb c.Assert(err, IsNil) } -func (s *testIndexSuite) checkAddOrDropColumn(c *C, state model.SchemaState, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) { +func (s *testIndexSuite) checkAddOrDropColumn(c *C, state model.SchemaState, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) { + ctx := testNewContext(c, d) + switch state { case model.StateNone: s.checkNoneColumn(c, ctx, d, tblInfo, handle, col, columnValue) @@ -461,17 +480,13 @@ func testGetColumn(t *tables.Table, name string) *column.Col { func (s *testIndexSuite) TestAddColumn(c *C) { d := newDDL(s.store, nil, nil, 100*time.Millisecond) - defer d.close() - tblInfo := testTableInfo(c, d, "t", 3) ctx := testNewContext(c, d) - defer ctx.FinishTxn(true) _, err := ctx.GetTxn(true) c.Assert(err, IsNil) testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - defer testDropTable(c, ctx, d, s.dbInfo, tblInfo) t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) @@ -479,67 +494,65 @@ func (s *testIndexSuite) TestAddColumn(c *C) { handle, err := t.AddRecord(ctx, row) c.Assert(err, IsNil) - ticker := time.NewTicker(d.lease) - done := make(chan *model.Job, 1) - err = ctx.FinishTxn(false) c.Assert(err, IsNil) colName := "c4" defaultColValue := int64(4) + checkOK := false + + tc := &testDDLCallback{} + tc.onJobUpdated = func(job *model.Job) { + if checkOK { + return + } + + t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) + col := testGetColumn(t, colName) + if col == nil { + return + } + + s.checkAddOrDropColumn(c, col.State, d, tblInfo, handle, col, row, defaultColValue, false) + + if col.State == model.StatePublic { + checkOK = true + } + } + + d.hook = tc + + // Use local ddl for callback test. + s.d.close() + + d.close() + d.start() + + job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, colName, &ColumnPosition{Type: ColumnPositionNone}, defaultColValue) + testCheckJobDone(c, d, job, true) _, err = ctx.GetTxn(true) c.Assert(err, IsNil) - go func() { - done <- testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, &ColumnPosition{Type: ColumnPositionNone}, defaultColValue) - }() + job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) + testCheckJobDone(c, d, job, false) - lastCheckState := model.StateNone - col := &column.Col{} + err = ctx.FinishTxn(false) + c.Assert(err, IsNil) - for { - select { - case job := <-done: - testCheckJobDone(c, d, job, true) - s.checkAddOrDropColumn(c, model.StatePublic, ctx, d, tblInfo, handle, col, row, defaultColValue, false) - return - case <-ticker.C: - d.close() - - t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) - col = testGetColumn(t, colName) - if col == nil { - continue - } - - // Here means column state is not changed, so just skipped. - if lastCheckState == col.State { - continue - } - - s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, false) - - lastCheckState = col.State - - d.start() - } - } + d.close() + s.d.start() } func (s *testIndexSuite) TestDropColumn(c *C) { d := newDDL(s.store, nil, nil, 100*time.Millisecond) - defer d.close() - tblInfo := testTableInfo(c, d, "t", 4) ctx := testNewContext(c, d) - defer ctx.FinishTxn(true) _, err := ctx.GetTxn(true) c.Assert(err, IsNil) testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - defer testDropTable(c, ctx, d, s.dbInfo, tblInfo) t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) @@ -549,44 +562,50 @@ func (s *testIndexSuite) TestDropColumn(c *C) { handle, err := t.AddRecord(ctx, append(row, defaultColValue)) c.Assert(err, IsNil) - ticker := time.NewTicker(d.lease) - done := make(chan *model.Job, 1) + err = ctx.FinishTxn(false) + c.Assert(err, IsNil) + + checkOK := false + oldCol := &column.Col{} + + tc := &testDDLCallback{} + tc.onJobUpdated = func(job *model.Job) { + if checkOK { + return + } + + t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) + col := testGetColumn(t, colName) + if col == nil { + s.checkAddOrDropColumn(c, model.StateNone, d, tblInfo, handle, oldCol, row, defaultColValue, true) + checkOK = true + return + } + + s.checkAddOrDropColumn(c, col.State, d, tblInfo, handle, col, row, defaultColValue, true) + oldCol = col + } + + d.hook = tc + + // Use local ddl for callback test. + s.d.close() + + d.close() + d.start() + + job := testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false) + testCheckJobDone(c, d, job, false) + + _, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) + testCheckJobDone(c, d, job, false) err = ctx.FinishTxn(false) c.Assert(err, IsNil) - go func() { - done <- testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false) - }() - - lastCheckState := model.StateNone - col := &column.Col{} - - for { - select { - case job := <-done: - testCheckJobDone(c, d, job, false) - s.checkAddOrDropColumn(c, model.StateNone, ctx, d, tblInfo, handle, col, row, defaultColValue, false) - return - case <-ticker.C: - d.close() - - t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) - col = testGetColumn(t, colName) - if col == nil { - return - } - - // Here means column state is not changed, so just skipped. - if lastCheckState == col.State { - continue - } - - s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, true) - - lastCheckState = col.State - - d.start() - } - } + d.close() + s.d.start() } diff --git a/ddl/worker.go b/ddl/worker.go index b29df76581..caa670202c 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -203,8 +203,6 @@ func (d *ddl) handleJobQueue() error { // and retry later if the job is not cancelled. d.runJob(t, job) - d.hook.OnJobRunAfter(job) - if job.State == model.JobDone || job.State == model.JobCancelled { err = d.finishJob(t, job) if err == nil { @@ -234,6 +232,8 @@ func (d *ddl) handleJobQueue() error { return nil } + d.hook.OnJobUpdated(job) + // here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update // the newest schema.