Files
tidb/pkg/util/cdcutil/cdc.go

266 lines
7.8 KiB
Go

// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cdcutil
import (
"bytes"
"context"
"encoding/json"
"fmt"
"math"
"path"
"regexp"
"strings"
"github.com/pingcap/errors"
"github.com/pingcap/log"
clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/zap"
)
const (
// CDCPrefix is the prefix of CDC information in etcd.
CDCPrefix = "/tidb/cdc/"
// ChangefeedPath is the path of changefeed information in etcd.
ChangefeedPath = "/changefeed/info/"
// CDCPrefixV61 is the prefix of CDC information in etcd.
CDCPrefixV61 = "/tidb/cdc/changefeed/info/"
)
type keyVersion int
const (
legacy keyVersion = 1
namespaced keyVersion = 2
invalidTs uint64 = math.MaxUint64
)
var (
// cluster id should be valid in
// https://github.com/pingcap/tiflow/blob/ca69c33948bea082aff9f4c0a357ace735b494ed/pkg/config/server_config.go#L218
clusterNameRe = regexp.MustCompile("^[a-zA-Z0-9]+(-[a-zA-Z0-9]+)*$")
)
type changefeed struct {
ID string
Cluster string
Namespace string
KeyVersion keyVersion
}
func (c *changefeed) infoKey() string {
switch c.KeyVersion {
case legacy:
return path.Join(CDCPrefix, "changefeed", "info", c.ID)
case namespaced:
return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "info", c.ID)
}
log.Panic("Invalid changefeed version type.", zap.Any("this", c))
return ""
}
func (c *changefeed) statusKey() string {
switch c.KeyVersion {
case legacy:
return path.Join(CDCPrefix, "changefeed", "status", c.ID)
case namespaced:
return path.Join(CDCPrefix, c.Cluster, c.Namespace, "changefeed", "status", c.ID)
}
log.Panic("Invalid changefeed version type.", zap.Any("this", c))
return ""
}
type checkCDCClient struct {
cli *clientv3.Client
}
func (c checkCDCClient) loadChangefeeds(ctx context.Context, out *[]changefeed) error {
resp, err := c.cli.Get(ctx, CDCPrefix, clientv3.WithPrefix(), clientv3.WithKeysOnly())
if err != nil {
return errors.Trace(err)
}
for _, kv := range resp.Kvs {
// example: /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID>
k := kv.Key[len(CDCPrefix)-1:]
// example of k( >6.1): /<clusterID>/<namespace>/changefeed/info/<changefeedID>
// example of k(<=6.1): /changefeed/info/<changefeedID>
clusterAndNamespace, changefeedID, found := bytes.Cut(k, []byte(ChangefeedPath))
if !found {
continue
}
if len(clusterAndNamespace) == 0 {
// They should be keys with format /tidb/cdc/changefeed/info.
*out = append(*out, changefeed{
ID: string(changefeedID),
KeyVersion: legacy,
})
continue
}
// example: clusterAndNamespace[1:] normally is <clusterID>/<namespace>
// but in migration scenario it become __backup__. we need handle it
// see https://github.com/pingcap/tiflow/issues/9807
clusterID, namespace, found := bytes.Cut(clusterAndNamespace[1:], []byte(`/`))
if !found {
// ignore __backup__ or other formats
continue
}
matched := clusterNameRe.Match(clusterID)
if !matched {
continue
}
*out = append(*out, changefeed{
ID: string(changefeedID),
Cluster: string(clusterID),
Namespace: string(namespace),
KeyVersion: namespaced,
})
}
return nil
}
type changefeedInfoView struct {
State string `json:"state"`
Start uint64 `json:"start-ts"`
}
type changefeedStatusView struct {
Checkpoint uint64 `json:"checkpoint-ts"`
}
func (c checkCDCClient) fetchCheckpointTSFromStatus(ctx context.Context, cf changefeed) (uint64, error) {
statusResp, err := c.cli.KV.Get(ctx, cf.statusKey())
if err != nil {
return 0, errors.Trace(err)
}
if statusResp.Count == 0 || len(statusResp.Kvs[0].Value) == 0 {
// The changefeed might was created recently, or just a phantom in test cases...
return 0, nil
}
var status changefeedStatusView
if err := json.Unmarshal(statusResp.Kvs[0].Value, &status); err != nil {
return 0, errors.Trace(err)
}
return status.Checkpoint, nil
}
func (c checkCDCClient) checkpointTSFor(ctx context.Context, cf changefeed) (uint64, error) {
infoResp, err := c.cli.KV.Get(ctx, cf.infoKey())
if err != nil {
return 0, errors.Trace(err)
}
if infoResp.Count == 0 {
// The changefeed have been removed.
return invalidTs, nil
}
var info changefeedInfoView
if err := json.Unmarshal(infoResp.Kvs[0].Value, &info); err != nil {
return 0, errors.Trace(err)
}
switch info.State {
// https://docs.pingcap.com/zh/tidb/stable/ticdc-changefeed-overview
case "finished":
return invalidTs, nil
case "failed", "running", "warning", "normal", "stopped", "error":
cts, err := c.fetchCheckpointTSFromStatus(ctx, cf)
if err != nil {
return 0, err
}
return max(cts, info.Start), nil
default:
// This changefeed may be noise, ignore it.
log.Warn("Ignoring invalid changefeed.", zap.Any("changefeed", cf), zap.String("state", info.State))
return invalidTs, nil
}
}
func (c checkCDCClient) getIncompatible(ctx context.Context, safeTS uint64) (*CDCNameSet, error) {
changefeeds := make([]changefeed, 0)
if err := c.loadChangefeeds(ctx, &changefeeds); err != nil {
return nil, err
}
nameset := new(CDCNameSet)
for _, cf := range changefeeds {
cts, err := c.checkpointTSFor(ctx, cf)
if err != nil {
return nil, errors.Annotatef(err, "failed to check changefeed %v", cf)
}
if cts < safeTS {
log.Info("Found incompatible changefeed.", zap.Any("changefeed", cf), zap.Uint64("checkpoint-ts", cts), zap.Uint64("safe-ts", safeTS))
nameset.save(cf)
}
}
return nameset, nil
}
// CDCNameSet saves CDC changefeed's information.
// nameSet maps `cluster/namespace` to `changefeed`s
type CDCNameSet struct {
changefeeds map[string][]string
}
func (s *CDCNameSet) save(cf changefeed) {
if s.changefeeds == nil {
s.changefeeds = map[string][]string{}
}
switch cf.KeyVersion {
case legacy:
s.changefeeds["<nil>"] = append(s.changefeeds["<nil>"], cf.ID)
case namespaced:
k := path.Join(cf.Cluster, cf.Namespace)
s.changefeeds[k] = append(s.changefeeds[k], cf.ID)
}
}
// Empty that the nameSet is empty means no changefeed exists.
func (s *CDCNameSet) Empty() bool {
return len(s.changefeeds) == 0
}
// MessageToUser convert the map `nameSet` to a readable message to user.
func (s *CDCNameSet) MessageToUser() string {
var changefeedMsgBuf strings.Builder
changefeedMsgBuf.WriteString("found CDC changefeed(s): ")
for clusterID, changefeedID := range s.changefeeds {
changefeedMsgBuf.WriteString("cluster/namespace: ")
changefeedMsgBuf.WriteString(clusterID)
changefeedMsgBuf.WriteString(" changefeed(s): ")
changefeedMsgBuf.WriteString(fmt.Sprintf("%v", changefeedID))
changefeedMsgBuf.WriteString(", ")
}
return changefeedMsgBuf.String()
}
// GetRunningChangefeeds gets CDC changefeed information and wraps them to a map
// for CDC >= v6.2, the etcd key format is /tidb/cdc/<clusterID>/<namespace>/changefeed/info/<changefeedID>
// for CDC <= v6.1, the etcd key format is /tidb/cdc/changefeed/info/<changefeedID>
func GetRunningChangefeeds(ctx context.Context, cli *clientv3.Client) (*CDCNameSet, error) {
checkCli := checkCDCClient{cli: cli}
return checkCli.getIncompatible(ctx, invalidTs)
}
// GetIncompatibleChangefeedsWithSafeTS gets CDC changefeed that may not compatible with the safe ts and wraps them to a map.
func GetIncompatibleChangefeedsWithSafeTS(ctx context.Context, cli *clientv3.Client, safeTS uint64) (*CDCNameSet, error) {
checkCli := checkCDCClient{cli: cli}
return checkCli.getIncompatible(ctx, safeTS)
}