From 132960e9c8b87e85c471ab0b538b573825c87f2c Mon Sep 17 00:00:00 2001 From: qiuyesuifeng Date: Mon, 9 Nov 2015 21:00:24 +0800 Subject: [PATCH 1/6] ddl: use callback to refactor column schema test. --- ddl/column_test.go | 176 ++++++++++++++++++++++++--------------------- 1 file changed, 95 insertions(+), 81 deletions(-) diff --git a/ddl/column_test.go b/ddl/column_test.go index 3fd3a85a2c..ca50aa4814 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -30,6 +30,16 @@ import ( var _ = Suite(&testColumnSuite{}) +type testDDLCallback struct { + *BaseCallback + + onJobAfterFunc func(*model.Job) +} + +func (tc *testDDLCallback) OnJobRunAfter(job *model.Job) { + tc.onJobAfterFunc(job) +} + type testColumnSuite struct { store kv.Storage dbInfo *model.DBInfo @@ -49,6 +59,7 @@ func (s *testColumnSuite) SetUpSuite(c *C) { func (s *testColumnSuite) TearDownSuite(c *C) { testDropSchema(c, mock.NewContext(), s.d, s.dbInfo) s.d.close() + err := s.store.Close() c.Assert(err, IsNil) } @@ -324,8 +335,11 @@ func (s *testIndexSuite) checkDeleteOnlyColumn(c *C, ctx context.Context, d *ddl func (s *testIndexSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) { t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + _, err := ctx.GetTxn(true) + c.Assert(err, IsNil) + i := int64(0) - err := t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { + err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { c.Assert(data, DeepEquals, row) i++ return true, nil @@ -381,9 +395,12 @@ func (s *testIndexSuite) checkWriteOnlyColumn(c *C, ctx context.Context, d *ddl, func (s *testIndexSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}) { t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) + _, err := ctx.GetTxn(true) + c.Assert(err, IsNil) + i := int64(0) oldRow := append(row, columnValue) - err := t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { + err = t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) { c.Assert(data, DeepEquals, oldRow) i++ return true, nil @@ -461,17 +478,13 @@ func testGetColumn(t *tables.Table, name string) *column.Col { func (s *testIndexSuite) TestAddColumn(c *C) { d := newDDL(s.store, nil, nil, 100*time.Millisecond) - defer d.close() - tblInfo := testTableInfo(c, d, "t", 3) ctx := testNewContext(c, d) - defer ctx.FinishTxn(true) _, err := ctx.GetTxn(true) c.Assert(err, IsNil) testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - defer testDropTable(c, ctx, d, s.dbInfo, tblInfo) t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) @@ -479,67 +492,63 @@ func (s *testIndexSuite) TestAddColumn(c *C) { handle, err := t.AddRecord(ctx, row) c.Assert(err, IsNil) - ticker := time.NewTicker(d.lease) - done := make(chan *model.Job, 1) - err = ctx.FinishTxn(false) c.Assert(err, IsNil) colName := "c4" defaultColValue := int64(4) + checkOK := false + + tc := &testDDLCallback{} + tc.onJobAfterFunc = func(job *model.Job) { + if checkOK { + return + } + + t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) + col := testGetColumn(t, colName) + if col == nil { + return + } + + s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, false) + if col.State == model.StatePublic { + checkOK = true + } + } + d.hook = tc + + // Use local ddl for callback test. + s.d.close() + + d.close() + d.start() + + job := testCreateColumn(c, ctx, d, s.dbInfo, tblInfo, colName, &ColumnPosition{Type: ColumnPositionNone}, defaultColValue) + testCheckJobDone(c, d, job, true) _, err = ctx.GetTxn(true) c.Assert(err, IsNil) - go func() { - done <- testCreateColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, &ColumnPosition{Type: ColumnPositionNone}, defaultColValue) - }() + job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) + testCheckJobDone(c, d, job, false) - lastCheckState := model.StateNone - col := &column.Col{} + err = ctx.FinishTxn(false) + c.Assert(err, IsNil) - for { - select { - case job := <-done: - testCheckJobDone(c, d, job, true) - s.checkAddOrDropColumn(c, model.StatePublic, ctx, d, tblInfo, handle, col, row, defaultColValue, false) - return - case <-ticker.C: - d.close() - - t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) - col = testGetColumn(t, colName) - if col == nil { - continue - } - - // Here means column state is not changed, so just skipped. - if lastCheckState == col.State { - continue - } - - s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, false) - - lastCheckState = col.State - - d.start() - } - } + d.close() + s.d.start() } func (s *testIndexSuite) TestDropColumn(c *C) { d := newDDL(s.store, nil, nil, 100*time.Millisecond) - defer d.close() - tblInfo := testTableInfo(c, d, "t", 4) ctx := testNewContext(c, d) - defer ctx.FinishTxn(true) _, err := ctx.GetTxn(true) c.Assert(err, IsNil) testCreateTable(c, ctx, d, s.dbInfo, tblInfo) - defer testDropTable(c, ctx, d, s.dbInfo, tblInfo) t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID) @@ -549,44 +558,49 @@ func (s *testIndexSuite) TestDropColumn(c *C) { handle, err := t.AddRecord(ctx, append(row, defaultColValue)) c.Assert(err, IsNil) - ticker := time.NewTicker(d.lease) - done := make(chan *model.Job, 1) + err = ctx.FinishTxn(false) + c.Assert(err, IsNil) + + checkOK := false + + tc := &testDDLCallback{} + tc.onJobAfterFunc = func(job *model.Job) { + if checkOK { + return + } + + t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) + col := testGetColumn(t, colName) + if col == nil { + return + } + + s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, true) + + if col.State == model.StateNone { + checkOK = true + } + } + d.hook = tc + + // Use local ddl for callback test. + s.d.close() + + d.close() + d.start() + + job := testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false) + testCheckJobDone(c, d, job, false) + + _, err = ctx.GetTxn(true) + c.Assert(err, IsNil) + + job = testDropTable(c, ctx, d, s.dbInfo, tblInfo) + testCheckJobDone(c, d, job, false) err = ctx.FinishTxn(false) c.Assert(err, IsNil) - go func() { - done <- testDropColumn(c, ctx, s.d, s.dbInfo, tblInfo, colName, false) - }() - - lastCheckState := model.StateNone - col := &column.Col{} - - for { - select { - case job := <-done: - testCheckJobDone(c, d, job, false) - s.checkAddOrDropColumn(c, model.StateNone, ctx, d, tblInfo, handle, col, row, defaultColValue, false) - return - case <-ticker.C: - d.close() - - t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) - col = testGetColumn(t, colName) - if col == nil { - return - } - - // Here means column state is not changed, so just skipped. - if lastCheckState == col.State { - continue - } - - s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, true) - - lastCheckState = col.State - - d.start() - } - } + d.close() + s.d.start() } From 937b49943301a35c91ee61cf5bdf473f43a6aa21 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 10 Nov 2015 08:40:09 +0800 Subject: [PATCH 2/6] ddl: adjust job callback place --- ddl/worker.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddl/worker.go b/ddl/worker.go index 1352a63f30..b0959a800a 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -198,8 +198,6 @@ func (d *ddl) handleJobQueue() error { // and retry later if the job is not cancelled. d.runJob(t, job) - d.hook.OnJobRunAfter(job) - if job.State == model.JobDone || job.State == model.JobCancelled { err = d.finishJob(t, job) if err == nil { @@ -229,6 +227,8 @@ func (d *ddl) handleJobQueue() error { return nil } + d.hook.OnJobRunAfter(job) + // here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update // the newest schema. From aff1496da77344a25e0584685e46a5c79f445bbc Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 10 Nov 2015 08:40:24 +0800 Subject: [PATCH 3/6] ddl: fix column test panic. --- ddl/column_test.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ddl/column_test.go b/ddl/column_test.go index ca50aa4814..809b2e35bd 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -511,7 +511,10 @@ func (s *testIndexSuite) TestAddColumn(c *C) { return } - s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, false) + ctx1 := testNewContext(c, d) + defer ctx.FinishTxn(false) + + s.checkAddOrDropColumn(c, col.State, ctx1, d, tblInfo, handle, col, row, defaultColValue, false) if col.State == model.StatePublic { checkOK = true } @@ -572,10 +575,13 @@ func (s *testIndexSuite) TestDropColumn(c *C) { t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) col := testGetColumn(t, colName) if col == nil { + checkOK = true return } - s.checkAddOrDropColumn(c, col.State, ctx, d, tblInfo, handle, col, row, defaultColValue, true) + ctx1 := testNewContext(c, d) + defer ctx.FinishTxn(false) + s.checkAddOrDropColumn(c, col.State, ctx1, d, tblInfo, handle, col, row, defaultColValue, true) if col.State == model.StateNone { checkOK = true From 8d6d727d271b2a952f939057f5906cac91840aca Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 10 Nov 2015 08:44:06 +0800 Subject: [PATCH 4/6] ddl: update column test. --- ddl/column_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ddl/column_test.go b/ddl/column_test.go index 809b2e35bd..c960a288d8 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -512,9 +512,8 @@ func (s *testIndexSuite) TestAddColumn(c *C) { } ctx1 := testNewContext(c, d) - defer ctx.FinishTxn(false) - s.checkAddOrDropColumn(c, col.State, ctx1, d, tblInfo, handle, col, row, defaultColValue, false) + ctx.FinishTxn(false) if col.State == model.StatePublic { checkOK = true } @@ -580,8 +579,8 @@ func (s *testIndexSuite) TestDropColumn(c *C) { } ctx1 := testNewContext(c, d) - defer ctx.FinishTxn(false) s.checkAddOrDropColumn(c, col.State, ctx1, d, tblInfo, handle, col, row, defaultColValue, true) + ctx.FinishTxn(false) if col.State == model.StateNone { checkOK = true From 85126b094262588110343980ce771ff5e24f0839 Mon Sep 17 00:00:00 2001 From: siddontang Date: Tue, 10 Nov 2015 09:34:43 +0800 Subject: [PATCH 5/6] ddl: OnJobRunAfter -> OnJobUpdated --- ddl/callback.go | 8 ++++---- ddl/callback_test.go | 23 +++++++++++++++++++++++ ddl/column_test.go | 10 +++++----- ddl/worker.go | 2 +- 4 files changed, 33 insertions(+), 10 deletions(-) create mode 100644 ddl/callback_test.go diff --git a/ddl/callback.go b/ddl/callback.go index e68467cdab..4c064de2fc 100644 --- a/ddl/callback.go +++ b/ddl/callback.go @@ -21,8 +21,8 @@ type Callback interface { OnChanged(err error) error // OnJobRunBefore is called before running job. OnJobRunBefore(job *model.Job) - // OnJobRunAfter is called after job is run. - OnJobRunAfter(job *model.Job) + // OnJobUpdated is called after the running job is updated. + OnJobUpdated(job *model.Job) } // BaseCallback implements Callback.OnChanged interface. @@ -39,7 +39,7 @@ func (c *BaseCallback) OnJobRunBefore(job *model.Job) { // Nothing to do. } -// OnJobRunAfter implements Callback.OnJobRunAfter interface. -func (c *BaseCallback) OnJobRunAfter(job *model.Job) { +// OnJobUpdated implements Callback.OnJobUpdated interface. +func (c *BaseCallback) OnJobUpdated(job *model.Job) { // Nothing to do. } diff --git a/ddl/callback_test.go b/ddl/callback_test.go new file mode 100644 index 0000000000..0b43d2a21c --- /dev/null +++ b/ddl/callback_test.go @@ -0,0 +1,23 @@ +// Copyright 2015 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package ddl + +import . "github.com/pingcap/check" + +func (s *testDDLSuite) TestCallback(c *C) { + cb := &BaseCallback{} + c.Assert(cb.OnChanged(nil), IsNil) + cb.OnJobRunBefore(nil) + cb.OnJobUpdated(nil) +} diff --git a/ddl/column_test.go b/ddl/column_test.go index c960a288d8..5719e440be 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -33,11 +33,11 @@ var _ = Suite(&testColumnSuite{}) type testDDLCallback struct { *BaseCallback - onJobAfterFunc func(*model.Job) + onJobUpdated func(*model.Job) } -func (tc *testDDLCallback) OnJobRunAfter(job *model.Job) { - tc.onJobAfterFunc(job) +func (tc *testDDLCallback) OnJobUpdated(job *model.Job) { + tc.onJobUpdated(job) } type testColumnSuite struct { @@ -500,7 +500,7 @@ func (s *testIndexSuite) TestAddColumn(c *C) { checkOK := false tc := &testDDLCallback{} - tc.onJobAfterFunc = func(job *model.Job) { + tc.onJobUpdated = func(job *model.Job) { if checkOK { return } @@ -566,7 +566,7 @@ func (s *testIndexSuite) TestDropColumn(c *C) { checkOK := false tc := &testDDLCallback{} - tc.onJobAfterFunc = func(job *model.Job) { + tc.onJobUpdated = func(job *model.Job) { if checkOK { return } diff --git a/ddl/worker.go b/ddl/worker.go index b0959a800a..e2d7310b7d 100644 --- a/ddl/worker.go +++ b/ddl/worker.go @@ -227,7 +227,7 @@ func (d *ddl) handleJobQueue() error { return nil } - d.hook.OnJobRunAfter(job) + d.hook.OnJobUpdated(job) // here means the job enters another state (delete only, write only, public, etc...) or is cancelled. // if the job is done or still running, we will wait 2 * lease time to guarantee other servers to update From bf7fbd420c593c43c8aed5eaf045c8b880a5ce3f Mon Sep 17 00:00:00 2001 From: qiuyesuifeng Date: Tue, 10 Nov 2015 10:58:02 +0800 Subject: [PATCH 6/6] ddl: add state none check for drop column. --- ddl/column_test.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/ddl/column_test.go b/ddl/column_test.go index 5719e440be..a5dd0b8e72 100644 --- a/ddl/column_test.go +++ b/ddl/column_test.go @@ -451,7 +451,9 @@ func (s *testIndexSuite) checkPublicColumn(c *C, ctx context.Context, d *ddl, tb c.Assert(err, IsNil) } -func (s *testIndexSuite) checkAddOrDropColumn(c *C, state model.SchemaState, ctx context.Context, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) { +func (s *testIndexSuite) checkAddOrDropColumn(c *C, state model.SchemaState, d *ddl, tblInfo *model.TableInfo, handle int64, col *column.Col, row []interface{}, columnValue interface{}, isDropped bool) { + ctx := testNewContext(c, d) + switch state { case model.StateNone: s.checkNoneColumn(c, ctx, d, tblInfo, handle, col, columnValue) @@ -511,13 +513,13 @@ func (s *testIndexSuite) TestAddColumn(c *C) { return } - ctx1 := testNewContext(c, d) - s.checkAddOrDropColumn(c, col.State, ctx1, d, tblInfo, handle, col, row, defaultColValue, false) - ctx.FinishTxn(false) + s.checkAddOrDropColumn(c, col.State, d, tblInfo, handle, col, row, defaultColValue, false) + if col.State == model.StatePublic { checkOK = true } } + d.hook = tc // Use local ddl for callback test. @@ -564,6 +566,7 @@ func (s *testIndexSuite) TestDropColumn(c *C) { c.Assert(err, IsNil) checkOK := false + oldCol := &column.Col{} tc := &testDDLCallback{} tc.onJobUpdated = func(job *model.Job) { @@ -574,18 +577,15 @@ func (s *testIndexSuite) TestDropColumn(c *C) { t := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID).(*tables.Table) col := testGetColumn(t, colName) if col == nil { + s.checkAddOrDropColumn(c, model.StateNone, d, tblInfo, handle, oldCol, row, defaultColValue, true) checkOK = true return } - ctx1 := testNewContext(c, d) - s.checkAddOrDropColumn(c, col.State, ctx1, d, tblInfo, handle, col, row, defaultColValue, true) - ctx.FinishTxn(false) - - if col.State == model.StateNone { - checkOK = true - } + s.checkAddOrDropColumn(c, col.State, d, tblInfo, handle, col, row, defaultColValue, true) + oldCol = col } + d.hook = tc // Use local ddl for callback test.