executor, infoschema: add ticdc into table cluster_info (#50523)

close pingcap/tidb#50723
This commit is contained in:
CharlesCheung
2024-02-26 17:20:00 +08:00
committed by GitHub
parent 1438892b9f
commit efd52507ab
7 changed files with 142 additions and 6 deletions

View File

@ -86,6 +86,8 @@ const (
TopologyTiProxy = "/topology/tiproxy"
// infoSuffix is the suffix of TiDB/TiProxy topology info.
infoSuffix = "/info"
// TopologyTiCDC means address of TiCDC.
TopologyTiCDC = "/topology/ticdc"
// TablePrometheusCacheExpiry is the expiry time for prometheus address cache.
TablePrometheusCacheExpiry = 10 * time.Second
// RequestRetryInterval is the sleep time before next retry for http request
@ -139,7 +141,7 @@ type ServerInfo struct {
StartTimestamp int64 `json:"start_timestamp"`
Labels map[string]string `json:"labels"`
// ServerID is a function, to always retrieve latest serverID from `Domain`,
// which will be changed on occasions such as connection to PD is restored after broken.
// which will be changed on occasions such as connection to PD is restored after broken.
ServerIDGetter func() uint64 `json:"-"`
// JSONServerID is `serverID` for json marshal/unmarshal ONLY.
@ -1335,3 +1337,69 @@ func (is *InfoSyncer) getTiProxyServerInfo(ctx context.Context) (map[string]*TiP
}
return nil, errors.Trace(err)
}
// TiCDCInfo is the server info for TiCDC.
type TiCDCInfo struct {
ID string `json:"id"`
Address string `json:"address"`
Version string `json:"version"`
GitHash string `json:"git-hash"`
DeployPath string `json:"deploy-path"`
StartTimestamp int64 `json:"start-timestamp"`
ClusterID string `json:"cluster-id"`
}
// GetTiCDCServerInfo gets all TiCDC servers information from etcd.
func GetTiCDCServerInfo(ctx context.Context) ([]*TiCDCInfo, error) {
is, err := getGlobalInfoSyncer()
if err != nil {
return nil, err
}
return is.getTiCDCServerInfo(ctx)
}
func (is *InfoSyncer) getTiCDCServerInfo(ctx context.Context) ([]*TiCDCInfo, error) {
// In test.
if is.etcdCli == nil {
return nil, nil
}
var err error
var resp *clientv3.GetResponse
allInfo := make([]*TiCDCInfo, 0)
for i := 0; i < keyOpDefaultRetryCnt; i++ {
if ctx.Err() != nil {
return nil, errors.Trace(ctx.Err())
}
childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout)
resp, err = is.etcdCli.Get(childCtx, TopologyTiCDC, clientv3.WithPrefix())
cancel()
if err != nil {
logutil.BgLogger().Info("get key failed", zap.String("key", TopologyTiCDC), zap.Error(err))
time.Sleep(200 * time.Millisecond)
continue
}
for _, kv := range resp.Kvs {
key := string(kv.Key)
keyParts := strings.Split(key, "/")
if len(keyParts) < 3 {
logutil.BgLogger().Info("invalid ticdc key", zap.String("key", key))
continue
}
clusterID := keyParts[1]
var info TiCDCInfo
err := json.Unmarshal(kv.Value, &info)
if err != nil {
logutil.BgLogger().Info("unmarshal key failed", zap.String("key", key), zap.ByteString("value", kv.Value),
zap.Error(err))
return nil, errors.Trace(err)
}
info.Version = strings.TrimPrefix(info.Version, "v")
info.ClusterID = clusterID
allInfo = append(allInfo, &info)
}
return allInfo, nil
}
return nil, errors.Trace(err)
}

View File

@ -245,6 +245,7 @@ func TestTiDBClusterInfo(t *testing.T) {
"tidb,127.0.0.1:11080," + mockAddr + ",mock-version,mock-githash,1001",
"tikv,127.0.0.1:11080," + mockAddr + ",mock-version,mock-githash,0",
"tiproxy,127.0.0.1:6000," + mockAddr + ",mock-version,mock-githash,0",
"ticdc,127.0.0.1:8300," + mockAddr + ",mock-version,mock-githash,0",
}
fpExpr := `return("` + strings.Join(instances, ";") + `")`
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo", fpExpr))
@ -256,6 +257,7 @@ func TestTiDBClusterInfo(t *testing.T) {
row("tidb", "127.0.0.1:11080", mockAddr, "mock-version", "mock-githash", "1001"),
row("tikv", "127.0.0.1:11080", mockAddr, "mock-version", "mock-githash", "0"),
row("tiproxy", "127.0.0.1:6000", mockAddr, "mock-version", "mock-githash", "0"),
row("ticdc", "127.0.0.1:8300", mockAddr, "mock-version", "mock-githash", "0"),
))
tk.MustQuery("select * from information_schema.cluster_config").Check(testkit.Rows(
"pd 127.0.0.1:11080 key1 value1",
@ -279,9 +281,17 @@ func TestTiDBClusterInfo(t *testing.T) {
"tikv 127.0.0.1:11080 key3.key4.nest4 n-value5",
"tikv 127.0.0.1:11080 key3.nest1 n-value1",
"tikv 127.0.0.1:11080 key3.nest2 n-value2",
"ticdc 127.0.0.1:8300 key1 value1",
"ticdc 127.0.0.1:8300 key2.nest1 n-value1",
"ticdc 127.0.0.1:8300 key2.nest2 n-value2",
"ticdc 127.0.0.1:8300 key3.key4.nest3 n-value4",
"ticdc 127.0.0.1:8300 key3.key4.nest4 n-value5",
"ticdc 127.0.0.1:8300 key3.nest1 n-value1",
"ticdc 127.0.0.1:8300 key3.nest2 n-value2",
))
tk.MustQuery("select TYPE, `KEY`, VALUE from information_schema.cluster_config where `key`='key3.key4.nest4' order by type").Check(testkit.Rows(
"pd key3.key4.nest4 n-value5",
"ticdc key3.key4.nest4 n-value5",
"tidb key3.key4.nest4 n-value5",
"tikv key3.key4.nest4 n-value5",
))

View File

@ -45,6 +45,7 @@ func TestInspectionTables(t *testing.T) {
"tidb,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash,1001",
"tikv,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash,0",
"tiproxy,127.0.0.1:6000,127.0.0.1:3380,mock-version,mock-githash,0",
"ticdc,127.0.0.1:8300,127.0.0.1:8301,mock-version,mock-githash,0",
}
fpName := "github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo"
fpExpr := `return("` + strings.Join(instances, ";") + `")`
@ -56,6 +57,7 @@ func TestInspectionTables(t *testing.T) {
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 1001",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0",
"tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0",
"ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0",
))
// enable inspection mode
@ -66,9 +68,10 @@ func TestInspectionTables(t *testing.T) {
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 1001",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0",
"tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0",
"ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0",
))
require.NoError(t, inspectionTableCache["cluster_info"].Err)
require.Len(t, inspectionTableCache["cluster_info"].Rows, 4)
require.Len(t, inspectionTableCache["cluster_info"].Rows, 5)
// check whether is obtain data from cache at the next time
inspectionTableCache["cluster_info"].Rows[0][0].SetString("modified-pd", mysql.DefaultCollationName)
@ -77,6 +80,7 @@ func TestInspectionTables(t *testing.T) {
"tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 1001",
"tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0",
"tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0",
"ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0",
))
tk.Session().GetSessionVars().InspectionTableCache = nil
}

View File

@ -480,7 +480,7 @@ func createClusterGRPCServer(t testing.TB) map[string]*testServer {
testServers := map[string]*testServer{}
// create gRPC servers
for _, typ := range []string{"tidb", "tikv", "tiproxy", "pd"} {
for _, typ := range []string{"tidb", "tikv", "ticdc", "tiproxy", "pd"} {
tmpDir := t.TempDir()
server := grpc.NewServer()

View File

@ -204,6 +204,8 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String
url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), statusAddr)
case "tiproxy":
url = fmt.Sprintf("%s://%s/api/admin/config?format=json", util.InternalHTTPSchema(), statusAddr)
case "ticdc":
url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), statusAddr)
default:
ch <- result{err: errors.Errorf("currently we do not support get config from node type: %s(%s)", typ, address)}
return

View File

@ -534,6 +534,22 @@ func TestTiDBClusterLog(t *testing.T) {
logtime(`2019/08/26 06:28:19.011`) + ` [critical] [test log message tiproxy 14, bar]`,
})
// TiCDC
writeTmpFile(t, testServers["ticdc"].tmpDir, "ticdc.log", []string{
logtime(`2019/08/26 06:19:13.011`) + ` [INFO] [test log message ticdc 1, foo]`,
logtime(`2019/08/26 06:20:14.011`) + ` [DEBUG] [test log message ticdc 2, foo]`,
logtime(`2019/08/26 06:21:15.011`) + ` [error] [test log message ticdc 3, foo]`,
logtime(`2019/08/26 06:22:16.011`) + ` [trace] [test log message ticdc 4, foo]`,
logtime(`2019/08/26 06:23:17.011`) + ` [CRITICAL] [test log message ticdc 5, foo]`,
})
writeTmpFile(t, testServers["ticdc"].tmpDir, "ticdc-1.log", []string{
logtime(`2019/08/26 06:24:15.011`) + ` [info] [test log message ticdc 10, bar]`,
logtime(`2019/08/26 06:25:16.011`) + ` [debug] [test log message ticdc 11, bar]`,
logtime(`2019/08/26 06:26:17.011`) + ` [ERROR] [test log message ticdc 12, bar]`,
logtime(`2019/08/26 06:27:18.011`) + ` [TRACE] [test log message ticdc 13, bar]`,
logtime(`2019/08/26 06:28:19.011`) + ` [critical] [test log message ticdc 14, bar]`,
})
// PD
writeTmpFile(t, testServers["pd"].tmpDir, "pd.log", []string{
logtime(`2019/08/26 06:18:13.011`) + ` [INFO] [test log message pd 1, foo]`,
@ -552,6 +568,7 @@ func TestTiDBClusterLog(t *testing.T) {
fullLogs := [][]string{
{"2019/08/26 06:18:13.011", "pd", "INFO", "[test log message pd 1, foo]"},
{"2019/08/26 06:19:13.011", "ticdc", "INFO", "[test log message ticdc 1, foo]"},
{"2019/08/26 06:19:13.011", "tidb", "INFO", "[test log message tidb 1, foo]"},
{"2019/08/26 06:19:13.011", "tikv", "INFO", "[test log message tikv 1, foo]"},
{"2019/08/26 06:19:13.011", "tiproxy", "INFO", "[test log message tiproxy 1, foo]"},
@ -560,35 +577,44 @@ func TestTiDBClusterLog(t *testing.T) {
{"2019/08/26 06:19:15.011", "tidb", "error", "[test log message tidb 3, foo]"},
{"2019/08/26 06:19:16.011", "tidb", "trace", "[test log message tidb 4, foo]"},
{"2019/08/26 06:19:17.011", "tidb", "CRITICAL", "[test log message tidb 5, foo]"},
{"2019/08/26 06:20:14.011", "ticdc", "DEBUG", "[test log message ticdc 2, foo]"},
{"2019/08/26 06:20:14.011", "tikv", "DEBUG", "[test log message tikv 2, foo]"},
{"2019/08/26 06:20:14.011", "tiproxy", "DEBUG", "[test log message tiproxy 2, foo]"},
{"2019/08/26 06:20:15.011", "pd", "error", "[test log message pd 3, foo]"},
{"2019/08/26 06:21:15.011", "ticdc", "error", "[test log message ticdc 3, foo]"},
{"2019/08/26 06:21:15.011", "tikv", "error", "[test log message tikv 3, foo]"},
{"2019/08/26 06:21:15.011", "tiproxy", "error", "[test log message tiproxy 3, foo]"},
{"2019/08/26 06:21:16.011", "pd", "trace", "[test log message pd 4, foo]"},
{"2019/08/26 06:22:16.011", "ticdc", "trace", "[test log message ticdc 4, foo]"},
{"2019/08/26 06:22:16.011", "tikv", "trace", "[test log message tikv 4, foo]"},
{"2019/08/26 06:22:16.011", "tiproxy", "trace", "[test log message tiproxy 4, foo]"},
{"2019/08/26 06:22:17.011", "pd", "CRITICAL", "[test log message pd 5, foo]"},
{"2019/08/26 06:23:13.011", "pd", "info", "[test log message pd 10, bar]"},
{"2019/08/26 06:23:17.011", "ticdc", "CRITICAL", "[test log message ticdc 5, foo]"},
{"2019/08/26 06:23:17.011", "tikv", "CRITICAL", "[test log message tikv 5, foo]"},
{"2019/08/26 06:23:17.011", "tiproxy", "CRITICAL", "[test log message tiproxy 5, foo]"},
{"2019/08/26 06:24:14.011", "pd", "debug", "[test log message pd 11, bar]"},
{"2019/08/26 06:24:15.011", "ticdc", "info", "[test log message ticdc 10, bar]"},
{"2019/08/26 06:24:15.011", "tikv", "info", "[test log message tikv 10, bar]"},
{"2019/08/26 06:24:15.011", "tiproxy", "info", "[test log message tiproxy 10, bar]"},
{"2019/08/26 06:25:13.011", "tidb", "info", "[test log message tidb 10, bar]"},
{"2019/08/26 06:25:14.011", "tidb", "debug", "[test log message tidb 11, bar]"},
{"2019/08/26 06:25:15.011", "pd", "ERROR", "[test log message pd 12, bar]"},
{"2019/08/26 06:25:15.011", "tidb", "ERROR", "[test log message tidb 12, bar]"},
{"2019/08/26 06:25:16.011", "ticdc", "debug", "[test log message ticdc 11, bar]"},
{"2019/08/26 06:25:16.011", "tidb", "TRACE", "[test log message tidb 13, bar]"},
{"2019/08/26 06:25:16.011", "tikv", "debug", "[test log message tikv 11, bar]"},
{"2019/08/26 06:25:16.011", "tiproxy", "debug", "[test log message tiproxy 11, bar]"},
{"2019/08/26 06:25:17.011", "tidb", "critical", "[test log message tidb 14, bar]"},
{"2019/08/26 06:26:16.011", "pd", "TRACE", "[test log message pd 13, bar]"},
{"2019/08/26 06:26:17.011", "ticdc", "ERROR", "[test log message ticdc 12, bar]"},
{"2019/08/26 06:26:17.011", "tikv", "ERROR", "[test log message tikv 12, bar]"},
{"2019/08/26 06:26:17.011", "tiproxy", "ERROR", "[test log message tiproxy 12, bar]"},
{"2019/08/26 06:27:17.011", "pd", "critical", "[test log message pd 14, bar]"},
{"2019/08/26 06:27:18.011", "ticdc", "TRACE", "[test log message ticdc 13, bar]"},
{"2019/08/26 06:27:18.011", "tikv", "TRACE", "[test log message tikv 13, bar]"},
{"2019/08/26 06:27:18.011", "tiproxy", "TRACE", "[test log message tiproxy 13, bar]"},
{"2019/08/26 06:28:19.011", "ticdc", "critical", "[test log message ticdc 14, bar]"},
{"2019/08/26 06:28:19.011", "tikv", "critical", "[test log message tikv 14, bar]"},
{"2019/08/26 06:28:19.011", "tiproxy", "critical", "[test log message tiproxy 14, bar]"},
}
@ -612,6 +638,7 @@ func TestTiDBClusterLog(t *testing.T) {
"message like '%'",
},
expected: [][]string{
{"2019/08/26 06:19:13.011", "ticdc", "INFO", "[test log message ticdc 1, foo]"},
{"2019/08/26 06:19:13.011", "tidb", "INFO", "[test log message tidb 1, foo]"},
{"2019/08/26 06:19:13.011", "tikv", "INFO", "[test log message tikv 1, foo]"},
{"2019/08/26 06:19:13.011", "tiproxy", "INFO", "[test log message tiproxy 1, foo]"},
@ -620,9 +647,11 @@ func TestTiDBClusterLog(t *testing.T) {
{"2019/08/26 06:19:15.011", "tidb", "error", "[test log message tidb 3, foo]"},
{"2019/08/26 06:19:16.011", "tidb", "trace", "[test log message tidb 4, foo]"},
{"2019/08/26 06:19:17.011", "tidb", "CRITICAL", "[test log message tidb 5, foo]"},
{"2019/08/26 06:20:14.011", "ticdc", "DEBUG", "[test log message ticdc 2, foo]"},
{"2019/08/26 06:20:14.011", "tikv", "DEBUG", "[test log message tikv 2, foo]"},
{"2019/08/26 06:20:14.011", "tiproxy", "DEBUG", "[test log message tiproxy 2, foo]"},
{"2019/08/26 06:20:15.011", "pd", "error", "[test log message pd 3, foo]"},
{"2019/08/26 06:21:15.011", "ticdc", "error", "[test log message ticdc 3, foo]"},
{"2019/08/26 06:21:15.011", "tikv", "error", "[test log message tikv 3, foo]"},
{"2019/08/26 06:21:15.011", "tiproxy", "error", "[test log message tiproxy 3, foo]"},
},
@ -759,10 +788,12 @@ func TestTiDBClusterLog(t *testing.T) {
expected: [][]string{
{"2019/08/26 06:19:17.011", "tidb", "CRITICAL", "[test log message tidb 5, foo]"},
{"2019/08/26 06:22:17.011", "pd", "CRITICAL", "[test log message pd 5, foo]"},
{"2019/08/26 06:23:17.011", "ticdc", "CRITICAL", "[test log message ticdc 5, foo]"},
{"2019/08/26 06:23:17.011", "tikv", "CRITICAL", "[test log message tikv 5, foo]"},
{"2019/08/26 06:23:17.011", "tiproxy", "CRITICAL", "[test log message tiproxy 5, foo]"},
{"2019/08/26 06:25:17.011", "tidb", "critical", "[test log message tidb 14, bar]"},
{"2019/08/26 06:27:17.011", "pd", "critical", "[test log message pd 14, bar]"},
{"2019/08/26 06:28:19.011", "ticdc", "critical", "[test log message ticdc 14, bar]"},
{"2019/08/26 06:28:19.011", "tikv", "critical", "[test log message tikv 14, bar]"},
{"2019/08/26 06:28:19.011", "tiproxy", "critical", "[test log message tiproxy 14, bar]"},
},

View File

@ -1804,11 +1804,12 @@ func GetClusterServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
})
type retriever func(ctx sessionctx.Context) ([]ServerInfo, error)
retrievers := []retriever{GetTiDBServerInfo, GetPDServerInfo, func(ctx sessionctx.Context) ([]ServerInfo, error) {
return GetStoreServerInfo(ctx.GetStore())
}, GetTiProxyServerInfo, GetTiCDCServerInfo}
//nolint: prealloc
var servers []ServerInfo
for _, r := range []retriever{GetTiDBServerInfo, GetPDServerInfo, func(ctx sessionctx.Context) ([]ServerInfo, error) {
return GetStoreServerInfo(ctx.GetStore())
}, GetTiProxyServerInfo} {
for _, r := range retrievers {
nodes, err := r(ctx)
if err != nil {
return nil, err
@ -2084,6 +2085,26 @@ func GetTiProxyServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
return servers, nil
}
// GetTiCDCServerInfo gets server info of TiCDC from PD.
func GetTiCDCServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) {
ticdcNodes, err := infosync.GetTiCDCServerInfo(context.Background())
if err != nil {
return nil, errors.Trace(err)
}
var servers = make([]ServerInfo, 0, len(ticdcNodes))
for _, node := range ticdcNodes {
servers = append(servers, ServerInfo{
ServerType: "ticdc",
Address: node.Address,
StatusAddr: node.Address,
Version: node.Version,
GitHash: node.GitHash,
StartTimestamp: node.StartTimestamp,
})
}
return servers, nil
}
// SysVarHiddenForSem checks if a given sysvar is hidden according to SEM and privileges.
func SysVarHiddenForSem(ctx sessionctx.Context, sysVarNameInLower string) bool {
if !sem.IsEnabled() || !sem.IsInvisibleSysVar(sysVarNameInLower) {