ddl: fix bug of updating tiflash replica status ddl job been stuck. (#15001)

This commit is contained in:
crazycs
2020-03-05 15:04:41 +08:00
committed by GitHub
parent 6ccdf645dc
commit c124fca594
2 changed files with 68 additions and 25 deletions

View File

@ -850,25 +850,7 @@ func (s *testStateChangeSuite) TestParallelCreateAndRename(c *C) {
type checkRet func(c *C, err1, err2 error)
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
defer s.se.Execute(context.Background(), "drop table t")
_, err = s.se.Execute(context.Background(), "drop database if exists t_part")
c.Assert(err, IsNil)
s.se.Execute(context.Background(), `create table t_part (a int key)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)
func (s *testStateChangeSuiteBase) prepareTestControlParallelExecSQL(c *C) (session.Session, session.Session, chan struct{}, ddl.Callback) {
callback := &ddl.TestDDLCallback{}
times := 0
callback.OnJobUpdatedExported = func(job *model.Job) {
@ -894,12 +876,8 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
}
d := s.dom.DDL()
originalCallback := d.GetHook()
defer d.(ddl.DDLForTest).SetHook(originalCallback)
d.(ddl.DDLForTest).SetHook(callback)
wg := sync.WaitGroup{}
var err1 error
var err2 error
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test_db_state")
@ -908,7 +886,6 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
c.Assert(err, IsNil)
_, err = se1.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
wg.Add(2)
ch := make(chan struct{})
// Make sure the sql1 is put into the DDLJobQueue.
go func() {
@ -930,6 +907,35 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
time.Sleep(5 * time.Millisecond)
}
}()
return se, se1, ch, originalCallback
}
func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 string, f checkRet) {
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(context.Background(), "create table t(a int, b int, c int, d int auto_increment,e int, index idx1(d), index idx2(d,e))")
c.Assert(err, IsNil)
if len(s.preSQL) != 0 {
_, err := s.se.Execute(context.Background(), s.preSQL)
c.Assert(err, IsNil)
}
defer s.se.Execute(context.Background(), "drop table t")
_, err = s.se.Execute(context.Background(), "drop database if exists t_part")
c.Assert(err, IsNil)
s.se.Execute(context.Background(), `create table t_part (a int key)
partition by range(a) (
partition p0 values less than (10),
partition p1 values less than (20)
);`)
se, se1, ch, originalCallback := s.prepareTestControlParallelExecSQL(c)
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback)
var err1 error
var err2 error
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
_, err1 = se.Execute(context.Background(), sql1)
@ -944,6 +950,42 @@ func (s *testStateChangeSuiteBase) testControlParallelExecSQL(c *C, sql1, sql2 s
f(c, err1, err2)
}
func (s *testStateChangeSuite) TestParallelUpdateTableReplica(c *C) {
ctx := context.Background()
_, err := s.se.Execute(context.Background(), "use test_db_state")
c.Assert(err, IsNil)
_, err = s.se.Execute(ctx, "drop table if exists t1;")
c.Assert(err, IsNil)
_, err = s.se.Execute(ctx, "create table t1 (a int);")
c.Assert(err, IsNil)
_, err = s.se.Execute(ctx, "alter table t1 set tiflash replica 3 location labels 'a','b';")
c.Assert(err, IsNil)
se, se1, ch, originalCallback := s.prepareTestControlParallelExecSQL(c)
defer s.dom.DDL().(ddl.DDLForTest).SetHook(originalCallback)
t1 := testGetTableByName(c, se, "test_db_state", "t1")
var err1 error
var err2 error
wg := sync.WaitGroup{}
wg.Add(2)
go func() {
defer wg.Done()
// Mock for table tiflash replica was available.
err1 = domain.GetDomain(se).DDL().UpdateTableReplicaInfo(se, t1.Meta().ID, true)
}()
go func() {
defer wg.Done()
<-ch
// Mock for table tiflash replica was available.
err2 = domain.GetDomain(se1).DDL().UpdateTableReplicaInfo(se1, t1.Meta().ID, true)
}()
wg.Wait()
c.Assert(err1, IsNil)
c.Assert(err2.Error(), Equals, "[ddl:-1]the replica available status of table t1 is already updated")
}
func (s *testStateChangeSuite) testParallelExecSQL(c *C, sql string) {
se, err := session.CreateSession(s.store)
c.Assert(err, IsNil)

View File

@ -748,7 +748,8 @@ func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ erro
}
if tblInfo.TiFlashReplica == nil || (tblInfo.ID == physicalID && tblInfo.TiFlashReplica.Available == available) ||
(tblInfo.ID != physicalID && available == tblInfo.TiFlashReplica.IsPartitionAvailable(physicalID)) {
return ver, nil
job.State = model.JobStateCancelled
return ver, errors.Errorf("the replica available status of table %s is already updated", tblInfo.Name.String())
}
if tblInfo.ID == physicalID {