From 01ddb4cb8053969cd745cfbc0bc079523f89caa5 Mon Sep 17 00:00:00 2001 From: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Date: Thu, 23 Feb 2023 15:31:06 +0800 Subject: [PATCH] BR: move check_cdc to public pkg (#41505) close pingcap/tidb#41504 --- br/pkg/lightning/restore/precheck_impl.go | 60 +-------- .../lightning/restore/precheck_impl_test.go | 4 +- br/pkg/task/stream.go | 10 ++ br/pkg/utils/BUILD.bazel | 5 + br/pkg/utils/cdc.go | 127 ++++++++++++++++++ br/pkg/utils/cdc_test.go | 88 ++++++++++++ 6 files changed, 236 insertions(+), 58 deletions(-) create mode 100644 br/pkg/utils/cdc.go create mode 100644 br/pkg/utils/cdc_test.go diff --git a/br/pkg/lightning/restore/precheck_impl.go b/br/pkg/lightning/restore/precheck_impl.go index aff5dc7ae0..ed7c774dcb 100644 --- a/br/pkg/lightning/restore/precheck_impl.go +++ b/br/pkg/lightning/restore/precheck_impl.go @@ -14,7 +14,6 @@ package restore import ( - "bytes" "context" "encoding/json" "fmt" @@ -36,6 +35,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/streamhelper" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/store/pdtypes" @@ -779,65 +779,13 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*CheckResult, error) { errorMsg = append(errorMsg, fmt.Sprintf("found PiTR log streaming task(s): %v,", names)) } - // check etcd KV of CDC >= v6.2 - cdcPrefix := "/tidb/cdc/" - changefeedPath := []byte("/changefeed/info/") - - nameSet := make(map[string][]string, 1) - resp, err := ci.etcdCli.Get(ctx, cdcPrefix, clientv3.WithPrefix()) + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, ci.etcdCli) if err != nil { return nil, errors.Trace(err) } - for _, kv := range resp.Kvs { - // example: /tidb/cdc///changefeed/info/ - k := kv.Key[len(cdcPrefix):] - clusterAndNamespace, changefeedID, found := bytes.Cut(k, changefeedPath) - if !found { - continue - } - if !isActiveCDCChangefeed(kv.Value) { - continue - } - nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) - } - if len(nameSet) == 0 { - // check etcd KV of CDC <= v6.1 - cdcPrefixV61 := "/tidb/cdc/changefeed/info/" - resp, err = ci.etcdCli.Get(ctx, cdcPrefixV61, clientv3.WithPrefix()) - if err != nil { - return nil, errors.Trace(err) - } - for _, kv := range resp.Kvs { - // example: /tidb/cdc/changefeed/info/ - k := kv.Key[len(cdcPrefixV61):] - if len(k) == 0 { - continue - } - if !isActiveCDCChangefeed(kv.Value) { - continue - } - - nameSet[""] = append(nameSet[""], string(k)) - } - } - - if len(nameSet) > 0 { - var changefeedMsgBuf strings.Builder - changefeedMsgBuf.WriteString("found CDC changefeed(s): ") - isFirst := true - for clusterID, captureIDs := range nameSet { - if !isFirst { - changefeedMsgBuf.WriteString(", ") - } - isFirst = false - changefeedMsgBuf.WriteString("cluster/namespace: ") - changefeedMsgBuf.WriteString(clusterID) - changefeedMsgBuf.WriteString(" changefeed(s): ") - changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) - } - changefeedMsgBuf.WriteString(",") - errorMsg = append(errorMsg, changefeedMsgBuf.String()) + if !nameSet.Empty() { + errorMsg = append(errorMsg, nameSet.MessageToUser()) } if len(errorMsg) > 0 { diff --git a/br/pkg/lightning/restore/precheck_impl_test.go b/br/pkg/lightning/restore/precheck_impl_test.go index 9d10c34b76..eda37f7fc8 100644 --- a/br/pkg/lightning/restore/precheck_impl_test.go +++ b/br/pkg/lightning/restore/precheck_impl_test.go @@ -650,7 +650,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) s.Require().False(result.Passed) s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+ - "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test],\n"+ + "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], \n"+ "local backend is not compatible with them. Please switch to tidb backend then try again.", result.Message) @@ -671,7 +671,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().NoError(err) s.Require().False(result.Passed) s.Require().Equal("found PiTR log streaming task(s): [br_name],\n"+ - "found CDC changefeed(s): cluster/namespace: changefeed(s): [test],\n"+ + "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], \n"+ "local backend is not compatible with them. Please switch to tidb backend then try again.", result.Message) diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 40f851048d..789f6c00ac 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -1050,6 +1050,7 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { log.Error("failed to close the etcd client", zap.Error(err)) } }() + // check log backup task tasks, err := cli.GetAllTasks(ctx) if err != nil { return err @@ -1057,6 +1058,15 @@ func checkTaskExists(ctx context.Context, cfg *RestoreConfig) error { if len(tasks) > 0 { return errors.Errorf("log backup task is running: %s, please stop the task before restore, and after PITR operation finished, create log-backup task again and create a full backup on this cluster", tasks[0].Info.Name) } + + // check cdc changefeed + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, etcdCLI) + if err != nil { + return err + } + if !nameSet.Empty() { + return errors.Errorf("%splease stop changefeed(s) before restore", nameSet.MessageToUser()) + } return nil } diff --git a/br/pkg/utils/BUILD.bazel b/br/pkg/utils/BUILD.bazel index 498400fc53..a453370d25 100644 --- a/br/pkg/utils/BUILD.bazel +++ b/br/pkg/utils/BUILD.bazel @@ -4,6 +4,7 @@ go_library( name = "utils", srcs = [ "backoff.go", + "cdc.go", "db.go", "dyn_pprof_other.go", "dyn_pprof_unix.go", @@ -47,6 +48,7 @@ go_library( "@com_github_pingcap_log//:log", "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//backoff", "@org_golang_google_grpc//codes", @@ -67,6 +69,7 @@ go_test( timeout = "short", srcs = [ "backoff_test.go", + "cdc_test.go", "db_test.go", "env_test.go", "json_test.go", @@ -103,6 +106,8 @@ go_test( "@com_github_pingcap_kvproto//pkg/encryptionpb", "@com_github_stretchr_testify//require", "@com_github_tikv_pd_client//:client", + "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_tests_v3//integration", "@org_golang_google_grpc//codes", "@org_golang_google_grpc//status", "@org_uber_go_goleak//:goleak", diff --git a/br/pkg/utils/cdc.go b/br/pkg/utils/cdc.go new file mode 100644 index 0000000000..ab655f826f --- /dev/null +++ b/br/pkg/utils/cdc.go @@ -0,0 +1,127 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/pingcap/errors" + "github.com/pingcap/log" + clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" +) + +const ( + CDCPrefix = "/tidb/cdc/" + ChangefeedPath = "/changefeed/info/" + CDCPrefixV61 = "/tidb/cdc/changefeed/info/" +) + +// CDCNameSet saves CDC changefeed's information. +// nameSet maps `cluster/namespace` to `changefeed`s +type CDCNameSet struct { + nameSet map[string][]string +} + +// that the nameSet is empty means no changefeed exists. +func (s *CDCNameSet) Empty() bool { + return len(s.nameSet) == 0 +} + +// MessageToUser convert the map `nameSet` to a readable message to user. +func (s *CDCNameSet) MessageToUser() string { + var changefeedMsgBuf strings.Builder + changefeedMsgBuf.WriteString("found CDC changefeed(s): ") + for clusterID, captureIDs := range s.nameSet { + changefeedMsgBuf.WriteString("cluster/namespace: ") + changefeedMsgBuf.WriteString(clusterID) + changefeedMsgBuf.WriteString(" changefeed(s): ") + changefeedMsgBuf.WriteString(fmt.Sprintf("%v", captureIDs)) + changefeedMsgBuf.WriteString(", ") + } + return changefeedMsgBuf.String() +} + +// GetCDCChangefeedNameSet gets CDC changefeed information and wraps them to a map +// for CDC >= v6.2, the etcd key format is /tidb/cdc///changefeed/info/ +// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/ +func GetCDCChangefeedNameSet(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) { + nameSet := make(map[string][]string, 1) + // check etcd KV of CDC >= v6.2 + resp, err := cli.Get(ctx, CDCPrefix, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + + for _, kv := range resp.Kvs { + // example: /tidb/cdc///changefeed/info/ + k := kv.Key[len(CDCPrefix):] + clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath)) + if !found { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + + nameSet[string(clusterAndNamespace)] = append(nameSet[string(clusterAndNamespace)], string(changefeedID)) + } + if len(nameSet) == 0 { + // check etcd KV of CDC <= v6.1 + resp, err = cli.Get(ctx, CDCPrefixV61, clientv3.WithPrefix()) + if err != nil { + return nil, errors.Trace(err) + } + for _, kv := range resp.Kvs { + // example: /tidb/cdc/changefeed/info/ + k := kv.Key[len(CDCPrefixV61):] + if len(k) == 0 { + continue + } + if !isActiveCDCChangefeed(kv.Value) { + continue + } + + nameSet[""] = append(nameSet[""], string(k)) + } + } + + return &CDCNameSet{nameSet}, nil +} + +type onlyState struct { + State string `json:"state"` +} + +func isActiveCDCChangefeed(jsonBytes []byte) bool { + s := onlyState{} + err := json.Unmarshal(jsonBytes, &s) + if err != nil { + // maybe a compatible issue, skip this key + log.L().Error("unmarshal etcd value failed when check CDC changefeed, will skip this key", + zap.ByteString("value", jsonBytes), + zap.Error(err)) + return false + } + switch s.State { + case "normal", "stopped", "error": + return true + default: + return false + } +} diff --git a/br/pkg/utils/cdc_test.go b/br/pkg/utils/cdc_test.go new file mode 100644 index 0000000000..1032693a0d --- /dev/null +++ b/br/pkg/utils/cdc_test.go @@ -0,0 +1,88 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package utils_test + +import ( + "context" + "testing" + + "github.com/pingcap/tidb/br/pkg/utils" + "github.com/stretchr/testify/require" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/integration" +) + +func TestGetCDCChangefeedNameSet(t *testing.T) { + integration.BeforeTestExternal(t) + testEtcdCluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer testEtcdCluster.Terminate(t) + + ctx := context.Background() + cli := testEtcdCluster.RandClient() + checkEtcdPut := func(key string, vals ...string) { + val := "" + if len(vals) == 1 { + val = vals[0] + } + _, err := cli.Put(ctx, key, val) + require.NoError(t, err) + } + + nameSet, err := utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.True(t, nameSet.Empty()) + + // TiCDC >= v6.2 + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/capture/3ecd5c98-0148-4086-adfd-17641995e71f") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/meta-version") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/meta/ticdc-delete-etcd-key-count") + checkEtcdPut("/tidb/cdc/default/__cdc_meta__/owner/22318498f4dd6639") + checkEtcdPut( + "/tidb/cdc/default/default/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"normal","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut( + "/tidb/cdc/default/default/changefeed/info/test-1", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"failed","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test") + checkEtcdPut("/tidb/cdc/default/default/changefeed/status/test-1") + checkEtcdPut("/tidb/cdc/default/default/task/position/3ecd5c98-0148-4086-adfd-17641995e71f/test-1") + checkEtcdPut("/tidb/cdc/default/default/upstream/7168358383033671922") + + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.False(t, nameSet.Empty()) + require.Equal(t, "found CDC changefeed(s): cluster/namespace: default/default changefeed(s): [test], ", + nameSet.MessageToUser()) + + _, err = cli.Delete(ctx, "/tidb/cdc/", clientv3.WithPrefix()) + require.NoError(t, err) + + // TiCDC <= v6.1 + checkEtcdPut("/tidb/cdc/capture/f14cb04d-5ba1-410e-a59b-ccd796920e9d") + checkEtcdPut( + "/tidb/cdc/changefeed/info/test", + `{"upstream-id":7195826648407968958,"namespace":"default","changefeed-id":"test-1","sink-uri":"mysql://root@127.0.0.1:3306?time-zone=","create-time":"2023-02-03T15:23:34.773768+08:00","start-ts":439198420741652483,"target-ts":0,"admin-job-type":0,"sort-engine":"unified","sort-dir":"","config":{"memory-quota":1073741824,"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"enable-sync-point":false,"bdr-mode":false,"sync-point-interval":600000000000,"sync-point-retention":86400000000000,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"event-filters":null},"mounter":{"worker-num":16},"sink":{"transaction-atomicity":"","protocol":"","dispatchers":null,"csv":{"delimiter":",","quote":"\"","null":"\\N","include-commit-ts":false},"column-selectors":null,"schema-registry":"","encoder-concurrency":16,"terminator":"\r\n","date-separator":"none","enable-partition-separator":false},"consistent":{"level":"none","max-log-size":64,"flush-interval":2000,"storage":""},"scheduler":{"region-per-span":0}},"state":"stopped","error":null,"creator-version":"v6.5.0-master-dirty"}`, + ) + checkEtcdPut("/tidb/cdc/job/test") + checkEtcdPut("/tidb/cdc/owner/223184ad80a88b0b") + checkEtcdPut("/tidb/cdc/task/position/f14cb04d-5ba1-410e-a59b-ccd796920e9d/test") + + nameSet, err = utils.GetCDCChangefeedNameSet(ctx, cli) + require.NoError(t, err) + require.False(t, nameSet.Empty()) + require.Equal(t, "found CDC changefeed(s): cluster/namespace: changefeed(s): [test], ", + nameSet.MessageToUser()) +}