diff --git a/ddl/constraint.go b/ddl/constraint.go index 3a4639c87f..e4b8856f03 100644 --- a/ddl/constraint.go +++ b/ddl/constraint.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/sqlexec" ) @@ -86,12 +87,14 @@ func (w *worker) onAddCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) ( originalState := constraintInfoInMeta.State switch constraintInfoInMeta.State { case model.StateNone: - // none -> write only job.SchemaState = model.StateWriteOnly constraintInfoInMeta.State = model.StateWriteOnly ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfoInMeta.State) case model.StateWriteOnly: - // write only -> public + job.SchemaState = model.StateWriteReorganization + constraintInfoInMeta.State = model.StateWriteReorganization + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfoInMeta.State) + case model.StateWriteReorganization: err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfoInMeta, job) if err != nil { return ver, errors.Trace(err) @@ -151,12 +154,10 @@ func onDropCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, originalState := constraintInfo.State switch constraintInfo.State { case model.StatePublic: - // public -> write only job.SchemaState = model.StateWriteOnly constraintInfo.State = model.StateWriteOnly ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State) case model.StateWriteOnly: - // write only -> None // write only state constraint will still take effect to check the newly inserted data. // So the dependent column shouldn't be dropped even in this intermediate state. constraintInfo.State = model.StateNone @@ -205,29 +206,50 @@ func checkDropCheckConstraint(t *meta.Meta, job *model.Job) (*model.TableInfo, * return tblInfo, constraintInfo, nil } -func (w *worker) onAlterCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ error) { +func (w *worker) onAlterCheckConstraint(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) { dbInfo, tblInfo, constraintInfo, enforced, err := checkAlterCheckConstraint(t, job) if err != nil { return ver, errors.Trace(err) } // enforced will fetch table data and check the constraint. - if constraintInfo.Enforced != enforced && enforced { - err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo, job) + if enforced { + originalState := constraintInfo.State + switch constraintInfo.State { + case model.StatePublic: + job.SchemaState = model.StateWriteReorganization + constraintInfo.State = model.StateWriteReorganization + constraintInfo.Enforced = enforced + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State) + case model.StateWriteReorganization: + job.SchemaState = model.StateWriteOnly + constraintInfo.State = model.StateWriteOnly + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State) + case model.StateWriteOnly: + err = w.verifyRemainRecordsForCheckConstraint(dbInfo, tblInfo, constraintInfo, job) + if err != nil { + if !table.ErrCheckConstraintViolated.Equal(err) { + return ver, errors.Trace(err) + } + constraintInfo.Enforced = !enforced + } + constraintInfo.State = model.StatePublic + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, originalState != constraintInfo.State) + if err != nil { + return ver, errors.Trace(err) + } + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) + } + } else { + constraintInfo.Enforced = enforced + ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true) if err != nil { - // check constraint error will cancel the job, job state has been changed - // to cancelled in addTableCheckConstraint. + // update version and tableInfo error will cause retry. return ver, errors.Trace(err) } + job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) } - constraintInfo.Enforced = enforced - ver, err = updateVersionAndTableInfoWithCheck(d, t, job, tblInfo, true) - if err != nil { - // update version and tableInfo error will cause retry. - return ver, errors.Trace(err) - } - job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo) - return ver, nil + return ver, err } func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *model.TableInfo, *model.ConstraintInfo, bool, error) { @@ -250,7 +272,6 @@ func checkAlterCheckConstraint(t *meta.Meta, job *model.Job) (*model.DBInfo, *mo job.State = model.JobStateCancelled return nil, nil, nil, false, errors.Trace(err) } - // do the double check with constraint existence. constraintInfo := tblInfo.FindConstraintInfoByName(constrName.L) if constraintInfo == nil { @@ -316,6 +337,12 @@ func findDependentColsInExpr(expr ast.ExprNode) map[string]struct{} { } func (w *worker) verifyRemainRecordsForCheckConstraint(dbInfo *model.DBInfo, tableInfo *model.TableInfo, constr *model.ConstraintInfo, job *model.Job) error { + // Inject a fail-point to skip the remaining records check. + failpoint.Inject("mockVerifyRemainDataSuccess", func(val failpoint.Value) { + if val.(bool) { + failpoint.Return(nil) + } + }) // Get sessionctx from ddl context resource pool in ddl worker. var sctx sessionctx.Context sctx, err := w.sessPool.Get() diff --git a/ddl/constraint_test.go b/ddl/constraint_test.go index cd4baa3bd9..838d5cd4c9 100644 --- a/ddl/constraint_test.go +++ b/ddl/constraint_test.go @@ -19,8 +19,11 @@ import ( "sort" "testing" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/ddl/util/callback" "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/table" + "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/testkit/external" "github.com/stretchr/testify/require" @@ -893,10 +896,7 @@ func TestAlterConstraintAddDrop(t *testing.T) { } originalCallback.OnChanged(nil) if job.SchemaState == model.StateWriteOnly { - // StateNone -> StateWriteOnly -> StatePublic - // Node in StateWriteOnly and StatePublic should check the constraint check. _, checkErr = tk1.Exec("insert into t (a, b) values(5,6) ") - // Don't do the assert in the callback function. } } callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) @@ -908,3 +908,218 @@ func TestAlterConstraintAddDrop(t *testing.T) { require.Errorf(t, err, "[table:3819]Check constraint 'cc' is violated.") tk.MustExec("drop table if exists t") } + +func TestAlterAddConstraintStateChange(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("insert into t values(12)") + + var checkErr error + d := dom.DDL() + originalCallback := d.GetHook() + callback := &callback.TestDDLCallback{} + onJobUpdatedExportedFunc := func(job *model.Job) { + if checkErr != nil { + return + } + originalCallback.OnChanged(nil) + if job.SchemaState == model.StateWriteReorganization { + tk1.MustQuery(fmt.Sprintf("select count(1) from `%s`.`%s` where not %s limit 1", "test", "t", "a > 10")).Check(testkit.Rows("0")) + // set constraint state + constraintTable := external.GetTableByName(t, tk1, "test", "t") + tableCommon, ok := constraintTable.(*tables.TableCommon) + require.True(t, ok) + originCons := tableCommon.Constraints + tableCommon.WritableConstraints = []*table.Constraint{} + tableCommon.Constraints = []*table.Constraint{} + // insert data + tk1.MustExec("insert into t values(1)") + // recover + tableCommon.Constraints = originCons + tableCommon.WritableConstraint() + } + } + + //StatNone StateWriteReorganization + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/ddl/mockVerifyRemainDataSuccess", "return(true)")) + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc) + d.SetHook(callback) + tk.MustExec("alter table t add constraint c0 check ( a > 10)") + tk.MustQuery("select * from t").Check(testkit.Rows("12", "1")) + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c0` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec("alter table t drop constraint c0") + tk.MustExec("delete from t where a = 1") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/ddl/mockVerifyRemainDataSuccess")) +} + +func TestAlterAddConstraintStateChange1(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("insert into t values(12)") + + var checkErr error + d := dom.DDL() + originalCallback := d.GetHook() + callback := &callback.TestDDLCallback{} + // StatNone -> StateWriteOnly + onJobUpdatedExportedFunc1 := func(job *model.Job) { + if checkErr != nil { + return + } + originalCallback.OnChanged(nil) + if job.SchemaState == model.StateWriteOnly { + // set constraint state + constraintTable := external.GetTableByName(t, tk1, "test", "t") + tableCommon, ok := constraintTable.(*tables.TableCommon) + require.True(t, ok) + originCons := tableCommon.Constraints + tableCommon.WritableConstraints = []*table.Constraint{} + tableCommon.Constraints = []*table.Constraint{} + // insert data + tk1.MustExec("insert into t values(1)") + // recover + tableCommon.Constraints = originCons + tableCommon.WritableConstraint() + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc1) + d.SetHook(callback) + tk.MustGetErrMsg("alter table t add constraint c1 check ( a > 10)", "[ddl:3819]Check constraint 'c1' is violated.") + tk.MustQuery("select * from t").Check(testkit.Rows("12", "1")) + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec("delete from t where a = 1") +} + +func TestAlterAddConstraintStateChange2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("insert into t values(12)") + + var checkErr error + d := dom.DDL() + originalCallback := d.GetHook() + callback := &callback.TestDDLCallback{} + // StateWriteOnly -> StateWriteReorganization + onJobUpdatedExportedFunc2 := func(job *model.Job) { + if checkErr != nil { + return + } + originalCallback.OnChanged(nil) + if job.SchemaState == model.StateWriteReorganization { + // set constraint state + constraintTable := external.GetTableByName(t, tk1, "test", "t") + tableCommon, ok := constraintTable.(*tables.TableCommon) + require.True(t, ok) + tableCommon.Constraints[0].State = model.StateWriteOnly + tableCommon.WritableConstraints = []*table.Constraint{} + // insert data + tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c2' is violated.") + // recover + tableCommon.Constraints[0].State = model.StateWriteReorganization + tableCommon.WritableConstraint() + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc2) + d.SetHook(callback) + tk.MustExec("alter table t add constraint c2 check ( a > 10)") + tk.MustQuery("select * from t").Check(testkit.Rows("12")) + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c2` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) + tk.MustExec("alter table t drop constraint c2") +} + +func TestAlterAddConstraintStateChange3(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int)") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("insert into t values(12)") + + var checkErr error + d := dom.DDL() + originalCallback := d.GetHook() + callback := &callback.TestDDLCallback{} + // StateWriteReorganization -> StatePublic + onJobUpdatedExportedFunc3 := func(job *model.Job) { + if checkErr != nil { + return + } + originalCallback.OnChanged(nil) + if job.SchemaState == model.StatePublic { + // set constraint state + constraintTable := external.GetTableByName(t, tk1, "test", "t") + tableCommon, ok := constraintTable.(*tables.TableCommon) + require.True(t, ok) + tableCommon.Constraints[0].State = model.StateWriteReorganization + tableCommon.WritableConstraints = []*table.Constraint{} + // insert data + tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c3' is violated.") + // recover + tableCommon.Constraints[0].State = model.StatePublic + tableCommon.WritableConstraint() + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3) + d.SetHook(callback) + tk.MustExec("alter table t add constraint c3 check ( a > 10)") + tk.MustQuery("select * from t").Check(testkit.Rows("12")) + tk.MustQuery("show create table t").Check(testkit.Rows("t CREATE TABLE `t` (\n `a` int(11) DEFAULT NULL,\nCONSTRAINT `c3` CHECK ((`a` > 10))\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin")) +} + +func TestAlterEnforcedConstraintStateChange(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (a int, constraint c1 check (a > 10) not enforced)") + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + tk1.MustExec("insert into t values(12)") + + var checkErr error + d := dom.DDL() + originalCallback := d.GetHook() + callback := &callback.TestDDLCallback{} + // StateWriteReorganization -> StatePublic + onJobUpdatedExportedFunc3 := func(job *model.Job) { + if checkErr != nil { + return + } + originalCallback.OnChanged(nil) + if job.SchemaState == model.StateWriteReorganization { + // set constraint state + constraintTable := external.GetTableByName(t, tk1, "test", "t") + tableCommon, ok := constraintTable.(*tables.TableCommon) + require.True(t, ok) + tableCommon.Constraints[0].State = model.StateWriteOnly + tableCommon.WritableConstraints = []*table.Constraint{} + // insert data + tk1.MustGetErrMsg("insert into t values(1)", "[table:3819]Check constraint 'c1' is violated.") + // recover + tableCommon.Constraints[0].State = model.StateWriteReorganization + tableCommon.WritableConstraint() + } + } + callback.OnJobUpdatedExported.Store(&onJobUpdatedExportedFunc3) + d.SetHook(callback) + tk.MustExec("alter table t alter constraint c1 enforced") + tk.MustQuery("select * from t").Check(testkit.Rows("12")) +}