ddl: Delete TiFlash sync status from etcd when table is truncated or dropped (#37184)
close pingcap/tidb#37168
This commit is contained in:
@ -45,6 +45,7 @@ import (
|
||||
"github.com/pingcap/tidb/util/logutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/tikv/client-go/v2/testutils"
|
||||
"go.etcd.io/etcd/tests/v3/integration"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -946,6 +947,54 @@ func TestTiFlashBatchUnsupported(t *testing.T) {
|
||||
tk.MustGetErrCode("alter database information_schema set tiflash replica 1", 8200)
|
||||
}
|
||||
|
||||
func TestTiFlashProgress(t *testing.T) {
|
||||
s, teardown := createTiFlashContext(t)
|
||||
s.tiflash.NotAvailable = true
|
||||
defer teardown()
|
||||
tk := testkit.NewTestKit(t, s.store)
|
||||
|
||||
integration.BeforeTest(t, integration.WithoutGoLeakDetection())
|
||||
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||
defer cluster.Terminate(t)
|
||||
|
||||
save := infosync.GetEtcdClient()
|
||||
defer infosync.SetEtcdClient(save)
|
||||
infosync.SetEtcdClient(cluster.Client(0))
|
||||
tk.MustExec("create database tiflash_d")
|
||||
tk.MustExec("create table tiflash_d.t(z int)")
|
||||
tk.MustExec("alter table tiflash_d.t set tiflash replica 1")
|
||||
tb, err := s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, tb)
|
||||
mustExist := func(tid int64) {
|
||||
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
|
||||
require.NoError(t, err)
|
||||
_, ok := pm[tb.Meta().ID]
|
||||
require.True(t, ok)
|
||||
}
|
||||
mustAbsent := func(tid int64) {
|
||||
pm, err := infosync.GetTiFlashTableSyncProgress(context.TODO())
|
||||
require.NoError(t, err)
|
||||
_, ok := pm[tb.Meta().ID]
|
||||
require.False(t, ok)
|
||||
}
|
||||
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, 5.0)
|
||||
mustExist(tb.Meta().ID)
|
||||
_ = infosync.DeleteTiFlashTableSyncProgress(tb.Meta().ID)
|
||||
mustAbsent(tb.Meta().ID)
|
||||
|
||||
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, 5.0)
|
||||
tk.MustExec("truncate table tiflash_d.t")
|
||||
mustAbsent(tb.Meta().ID)
|
||||
|
||||
tb, _ = s.dom.InfoSchema().TableByName(model.NewCIStr("tiflash_d"), model.NewCIStr("t"))
|
||||
_ = infosync.UpdateTiFlashTableSyncProgress(context.TODO(), tb.Meta().ID, 5.0)
|
||||
tk.MustExec("drop table tiflash_d.t")
|
||||
mustAbsent(tb.Meta().ID)
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
func TestTiFlashGroupIndexWhenStartup(t *testing.T) {
|
||||
s, teardown := createTiFlashContext(t)
|
||||
tiflash := s.tiflash
|
||||
|
||||
10
ddl/table.go
10
ddl/table.go
@ -364,6 +364,12 @@ func onDropTableOrView(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ er
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
if tblInfo.TiFlashReplica != nil {
|
||||
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
|
||||
if e != nil {
|
||||
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
|
||||
}
|
||||
}
|
||||
// Placement rules cannot be removed immediately after drop table / truncate table, because the
|
||||
// tables can be flashed back or recovered, therefore it moved to doGCPlacementRules in gc_worker.go.
|
||||
|
||||
@ -730,6 +736,10 @@ func onTruncateTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, _ erro
|
||||
|
||||
// Clear the TiFlash replica available status.
|
||||
if tblInfo.TiFlashReplica != nil {
|
||||
e := infosync.DeleteTiFlashTableSyncProgress(tblInfo.ID)
|
||||
if e != nil {
|
||||
logutil.BgLogger().Error("DeleteTiFlashTableSyncProgress fails", zap.Error(e))
|
||||
}
|
||||
// Set PD rules for TiFlash
|
||||
if pi := tblInfo.GetPartitionInfo(); pi != nil {
|
||||
if e := infosync.ConfigureTiFlashPDForPartitions(true, &pi.Definitions, tblInfo.TiFlashReplica.Count, &tblInfo.TiFlashReplica.LocationLabels, tblInfo.ID); e != nil {
|
||||
|
||||
@ -1192,6 +1192,26 @@ func DeleteInternalSession(se interface{}) {
|
||||
sm.DeleteInternalSession(se)
|
||||
}
|
||||
|
||||
// SetEtcdClient is only used for test.
|
||||
func SetEtcdClient(etcdCli *clientv3.Client) {
|
||||
is, err := getGlobalInfoSyncer()
|
||||
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
is.etcdCli = etcdCli
|
||||
}
|
||||
|
||||
// GetEtcdClient is only used for test.
|
||||
func GetEtcdClient() *clientv3.Client {
|
||||
is, err := getGlobalInfoSyncer()
|
||||
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
return is.etcdCli
|
||||
}
|
||||
|
||||
// GetPDScheduleConfig gets the schedule information from pd
|
||||
func GetPDScheduleConfig(ctx context.Context) (map[string]interface{}, error) {
|
||||
is, err := getGlobalInfoSyncer()
|
||||
|
||||
@ -313,6 +313,7 @@ type MockTiFlash struct {
|
||||
PdEnabled bool
|
||||
TiflashDelay time.Duration
|
||||
StartTime time.Time
|
||||
NotAvailable bool
|
||||
}
|
||||
|
||||
func (tiflash *MockTiFlash) setUpMockTiFlashHTTPServer() {
|
||||
@ -335,6 +336,10 @@ func (tiflash *MockTiFlash) setUpMockTiFlashHTTPServer() {
|
||||
return
|
||||
}
|
||||
table, ok := tiflash.SyncStatus[tableID]
|
||||
if tiflash.NotAvailable {
|
||||
// No region is available, so the table is not available.
|
||||
table.Regions = []int{}
|
||||
}
|
||||
if !ok {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
_, _ = w.Write([]byte("0\n\n"))
|
||||
@ -364,6 +369,7 @@ func NewMockTiFlash() *MockTiFlash {
|
||||
PdEnabled: true,
|
||||
TiflashDelay: 0,
|
||||
StartTime: time.Now(),
|
||||
NotAvailable: false,
|
||||
}
|
||||
tiflash.setUpMockTiFlashHTTPServer()
|
||||
return tiflash
|
||||
|
||||
Reference in New Issue
Block a user