diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 50bb795a8c..d7c78fd852 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -86,6 +86,8 @@ const ( TopologyTiProxy = "/topology/tiproxy" // infoSuffix is the suffix of TiDB/TiProxy topology info. infoSuffix = "/info" + // TopologyTiCDC means address of TiCDC. + TopologyTiCDC = "/topology/ticdc" // TablePrometheusCacheExpiry is the expiry time for prometheus address cache. TablePrometheusCacheExpiry = 10 * time.Second // RequestRetryInterval is the sleep time before next retry for http request @@ -139,7 +141,7 @@ type ServerInfo struct { StartTimestamp int64 `json:"start_timestamp"` Labels map[string]string `json:"labels"` // ServerID is a function, to always retrieve latest serverID from `Domain`, - // which will be changed on occasions such as connection to PD is restored after broken. + // which will be changed on occasions such as connection to PD is restored after broken. ServerIDGetter func() uint64 `json:"-"` // JSONServerID is `serverID` for json marshal/unmarshal ONLY. @@ -1335,3 +1337,69 @@ func (is *InfoSyncer) getTiProxyServerInfo(ctx context.Context) (map[string]*TiP } return nil, errors.Trace(err) } + +// TiCDCInfo is the server info for TiCDC. +type TiCDCInfo struct { + ID string `json:"id"` + Address string `json:"address"` + Version string `json:"version"` + GitHash string `json:"git-hash"` + DeployPath string `json:"deploy-path"` + StartTimestamp int64 `json:"start-timestamp"` + ClusterID string `json:"cluster-id"` +} + +// GetTiCDCServerInfo gets all TiCDC servers information from etcd. +func GetTiCDCServerInfo(ctx context.Context) ([]*TiCDCInfo, error) { + is, err := getGlobalInfoSyncer() + if err != nil { + return nil, err + } + return is.getTiCDCServerInfo(ctx) +} + +func (is *InfoSyncer) getTiCDCServerInfo(ctx context.Context) ([]*TiCDCInfo, error) { + // In test. + if is.etcdCli == nil { + return nil, nil + } + + var err error + var resp *clientv3.GetResponse + allInfo := make([]*TiCDCInfo, 0) + for i := 0; i < keyOpDefaultRetryCnt; i++ { + if ctx.Err() != nil { + return nil, errors.Trace(ctx.Err()) + } + childCtx, cancel := context.WithTimeout(ctx, keyOpDefaultTimeout) + resp, err = is.etcdCli.Get(childCtx, TopologyTiCDC, clientv3.WithPrefix()) + cancel() + if err != nil { + logutil.BgLogger().Info("get key failed", zap.String("key", TopologyTiCDC), zap.Error(err)) + time.Sleep(200 * time.Millisecond) + continue + } + for _, kv := range resp.Kvs { + key := string(kv.Key) + keyParts := strings.Split(key, "/") + if len(keyParts) < 3 { + logutil.BgLogger().Info("invalid ticdc key", zap.String("key", key)) + continue + } + clusterID := keyParts[1] + + var info TiCDCInfo + err := json.Unmarshal(kv.Value, &info) + if err != nil { + logutil.BgLogger().Info("unmarshal key failed", zap.String("key", key), zap.ByteString("value", kv.Value), + zap.Error(err)) + return nil, errors.Trace(err) + } + info.Version = strings.TrimPrefix(info.Version, "v") + info.ClusterID = clusterID + allInfo = append(allInfo, &info) + } + return allInfo, nil + } + return nil, errors.Trace(err) +} diff --git a/pkg/executor/infoschema_cluster_table_test.go b/pkg/executor/infoschema_cluster_table_test.go index 992001bae9..89cbfc62ae 100644 --- a/pkg/executor/infoschema_cluster_table_test.go +++ b/pkg/executor/infoschema_cluster_table_test.go @@ -245,6 +245,7 @@ func TestTiDBClusterInfo(t *testing.T) { "tidb,127.0.0.1:11080," + mockAddr + ",mock-version,mock-githash,1001", "tikv,127.0.0.1:11080," + mockAddr + ",mock-version,mock-githash,0", "tiproxy,127.0.0.1:6000," + mockAddr + ",mock-version,mock-githash,0", + "ticdc,127.0.0.1:8300," + mockAddr + ",mock-version,mock-githash,0", } fpExpr := `return("` + strings.Join(instances, ";") + `")` require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo", fpExpr)) @@ -256,6 +257,7 @@ func TestTiDBClusterInfo(t *testing.T) { row("tidb", "127.0.0.1:11080", mockAddr, "mock-version", "mock-githash", "1001"), row("tikv", "127.0.0.1:11080", mockAddr, "mock-version", "mock-githash", "0"), row("tiproxy", "127.0.0.1:6000", mockAddr, "mock-version", "mock-githash", "0"), + row("ticdc", "127.0.0.1:8300", mockAddr, "mock-version", "mock-githash", "0"), )) tk.MustQuery("select * from information_schema.cluster_config").Check(testkit.Rows( "pd 127.0.0.1:11080 key1 value1", @@ -279,9 +281,17 @@ func TestTiDBClusterInfo(t *testing.T) { "tikv 127.0.0.1:11080 key3.key4.nest4 n-value5", "tikv 127.0.0.1:11080 key3.nest1 n-value1", "tikv 127.0.0.1:11080 key3.nest2 n-value2", + "ticdc 127.0.0.1:8300 key1 value1", + "ticdc 127.0.0.1:8300 key2.nest1 n-value1", + "ticdc 127.0.0.1:8300 key2.nest2 n-value2", + "ticdc 127.0.0.1:8300 key3.key4.nest3 n-value4", + "ticdc 127.0.0.1:8300 key3.key4.nest4 n-value5", + "ticdc 127.0.0.1:8300 key3.nest1 n-value1", + "ticdc 127.0.0.1:8300 key3.nest2 n-value2", )) tk.MustQuery("select TYPE, `KEY`, VALUE from information_schema.cluster_config where `key`='key3.key4.nest4' order by type").Check(testkit.Rows( "pd key3.key4.nest4 n-value5", + "ticdc key3.key4.nest4 n-value5", "tidb key3.key4.nest4 n-value5", "tikv key3.key4.nest4 n-value5", )) diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index ff4d3aba26..8aca7b5ec0 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -45,6 +45,7 @@ func TestInspectionTables(t *testing.T) { "tidb,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash,1001", "tikv,127.0.0.1:11080,127.0.0.1:10080,mock-version,mock-githash,0", "tiproxy,127.0.0.1:6000,127.0.0.1:3380,mock-version,mock-githash,0", + "ticdc,127.0.0.1:8300,127.0.0.1:8301,mock-version,mock-githash,0", } fpName := "github.com/pingcap/tidb/pkg/infoschema/mockClusterInfo" fpExpr := `return("` + strings.Join(instances, ";") + `")` @@ -56,6 +57,7 @@ func TestInspectionTables(t *testing.T) { "tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 1001", "tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0", "tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0", + "ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0", )) // enable inspection mode @@ -66,9 +68,10 @@ func TestInspectionTables(t *testing.T) { "tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 1001", "tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0", "tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0", + "ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0", )) require.NoError(t, inspectionTableCache["cluster_info"].Err) - require.Len(t, inspectionTableCache["cluster_info"].Rows, 4) + require.Len(t, inspectionTableCache["cluster_info"].Rows, 5) // check whether is obtain data from cache at the next time inspectionTableCache["cluster_info"].Rows[0][0].SetString("modified-pd", mysql.DefaultCollationName) @@ -77,6 +80,7 @@ func TestInspectionTables(t *testing.T) { "tidb 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 1001", "tikv 127.0.0.1:11080 127.0.0.1:10080 mock-version mock-githash 0", "tiproxy 127.0.0.1:6000 127.0.0.1:3380 mock-version mock-githash 0", + "ticdc 127.0.0.1:8300 127.0.0.1:8301 mock-version mock-githash 0", )) tk.Session().GetSessionVars().InspectionTableCache = nil } diff --git a/pkg/executor/inspection_result_test.go b/pkg/executor/inspection_result_test.go index 29638352b7..a12f59ee93 100644 --- a/pkg/executor/inspection_result_test.go +++ b/pkg/executor/inspection_result_test.go @@ -480,7 +480,7 @@ func createClusterGRPCServer(t testing.TB) map[string]*testServer { testServers := map[string]*testServer{} // create gRPC servers - for _, typ := range []string{"tidb", "tikv", "tiproxy", "pd"} { + for _, typ := range []string{"tidb", "tikv", "ticdc", "tiproxy", "pd"} { tmpDir := t.TempDir() server := grpc.NewServer() diff --git a/pkg/executor/memtable_reader.go b/pkg/executor/memtable_reader.go index 773057202c..4b25a5d7fb 100644 --- a/pkg/executor/memtable_reader.go +++ b/pkg/executor/memtable_reader.go @@ -204,6 +204,8 @@ func fetchClusterConfig(sctx sessionctx.Context, nodeTypes, nodeAddrs set.String url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), statusAddr) case "tiproxy": url = fmt.Sprintf("%s://%s/api/admin/config?format=json", util.InternalHTTPSchema(), statusAddr) + case "ticdc": + url = fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), statusAddr) default: ch <- result{err: errors.Errorf("currently we do not support get config from node type: %s(%s)", typ, address)} return diff --git a/pkg/executor/memtable_reader_test.go b/pkg/executor/memtable_reader_test.go index 2d8acb2456..df5b4caf7a 100644 --- a/pkg/executor/memtable_reader_test.go +++ b/pkg/executor/memtable_reader_test.go @@ -534,6 +534,22 @@ func TestTiDBClusterLog(t *testing.T) { logtime(`2019/08/26 06:28:19.011`) + ` [critical] [test log message tiproxy 14, bar]`, }) + // TiCDC + writeTmpFile(t, testServers["ticdc"].tmpDir, "ticdc.log", []string{ + logtime(`2019/08/26 06:19:13.011`) + ` [INFO] [test log message ticdc 1, foo]`, + logtime(`2019/08/26 06:20:14.011`) + ` [DEBUG] [test log message ticdc 2, foo]`, + logtime(`2019/08/26 06:21:15.011`) + ` [error] [test log message ticdc 3, foo]`, + logtime(`2019/08/26 06:22:16.011`) + ` [trace] [test log message ticdc 4, foo]`, + logtime(`2019/08/26 06:23:17.011`) + ` [CRITICAL] [test log message ticdc 5, foo]`, + }) + writeTmpFile(t, testServers["ticdc"].tmpDir, "ticdc-1.log", []string{ + logtime(`2019/08/26 06:24:15.011`) + ` [info] [test log message ticdc 10, bar]`, + logtime(`2019/08/26 06:25:16.011`) + ` [debug] [test log message ticdc 11, bar]`, + logtime(`2019/08/26 06:26:17.011`) + ` [ERROR] [test log message ticdc 12, bar]`, + logtime(`2019/08/26 06:27:18.011`) + ` [TRACE] [test log message ticdc 13, bar]`, + logtime(`2019/08/26 06:28:19.011`) + ` [critical] [test log message ticdc 14, bar]`, + }) + // PD writeTmpFile(t, testServers["pd"].tmpDir, "pd.log", []string{ logtime(`2019/08/26 06:18:13.011`) + ` [INFO] [test log message pd 1, foo]`, @@ -552,6 +568,7 @@ func TestTiDBClusterLog(t *testing.T) { fullLogs := [][]string{ {"2019/08/26 06:18:13.011", "pd", "INFO", "[test log message pd 1, foo]"}, + {"2019/08/26 06:19:13.011", "ticdc", "INFO", "[test log message ticdc 1, foo]"}, {"2019/08/26 06:19:13.011", "tidb", "INFO", "[test log message tidb 1, foo]"}, {"2019/08/26 06:19:13.011", "tikv", "INFO", "[test log message tikv 1, foo]"}, {"2019/08/26 06:19:13.011", "tiproxy", "INFO", "[test log message tiproxy 1, foo]"}, @@ -560,35 +577,44 @@ func TestTiDBClusterLog(t *testing.T) { {"2019/08/26 06:19:15.011", "tidb", "error", "[test log message tidb 3, foo]"}, {"2019/08/26 06:19:16.011", "tidb", "trace", "[test log message tidb 4, foo]"}, {"2019/08/26 06:19:17.011", "tidb", "CRITICAL", "[test log message tidb 5, foo]"}, + {"2019/08/26 06:20:14.011", "ticdc", "DEBUG", "[test log message ticdc 2, foo]"}, {"2019/08/26 06:20:14.011", "tikv", "DEBUG", "[test log message tikv 2, foo]"}, {"2019/08/26 06:20:14.011", "tiproxy", "DEBUG", "[test log message tiproxy 2, foo]"}, {"2019/08/26 06:20:15.011", "pd", "error", "[test log message pd 3, foo]"}, + {"2019/08/26 06:21:15.011", "ticdc", "error", "[test log message ticdc 3, foo]"}, {"2019/08/26 06:21:15.011", "tikv", "error", "[test log message tikv 3, foo]"}, {"2019/08/26 06:21:15.011", "tiproxy", "error", "[test log message tiproxy 3, foo]"}, {"2019/08/26 06:21:16.011", "pd", "trace", "[test log message pd 4, foo]"}, + {"2019/08/26 06:22:16.011", "ticdc", "trace", "[test log message ticdc 4, foo]"}, {"2019/08/26 06:22:16.011", "tikv", "trace", "[test log message tikv 4, foo]"}, {"2019/08/26 06:22:16.011", "tiproxy", "trace", "[test log message tiproxy 4, foo]"}, {"2019/08/26 06:22:17.011", "pd", "CRITICAL", "[test log message pd 5, foo]"}, {"2019/08/26 06:23:13.011", "pd", "info", "[test log message pd 10, bar]"}, + {"2019/08/26 06:23:17.011", "ticdc", "CRITICAL", "[test log message ticdc 5, foo]"}, {"2019/08/26 06:23:17.011", "tikv", "CRITICAL", "[test log message tikv 5, foo]"}, {"2019/08/26 06:23:17.011", "tiproxy", "CRITICAL", "[test log message tiproxy 5, foo]"}, {"2019/08/26 06:24:14.011", "pd", "debug", "[test log message pd 11, bar]"}, + {"2019/08/26 06:24:15.011", "ticdc", "info", "[test log message ticdc 10, bar]"}, {"2019/08/26 06:24:15.011", "tikv", "info", "[test log message tikv 10, bar]"}, {"2019/08/26 06:24:15.011", "tiproxy", "info", "[test log message tiproxy 10, bar]"}, {"2019/08/26 06:25:13.011", "tidb", "info", "[test log message tidb 10, bar]"}, {"2019/08/26 06:25:14.011", "tidb", "debug", "[test log message tidb 11, bar]"}, {"2019/08/26 06:25:15.011", "pd", "ERROR", "[test log message pd 12, bar]"}, {"2019/08/26 06:25:15.011", "tidb", "ERROR", "[test log message tidb 12, bar]"}, + {"2019/08/26 06:25:16.011", "ticdc", "debug", "[test log message ticdc 11, bar]"}, {"2019/08/26 06:25:16.011", "tidb", "TRACE", "[test log message tidb 13, bar]"}, {"2019/08/26 06:25:16.011", "tikv", "debug", "[test log message tikv 11, bar]"}, {"2019/08/26 06:25:16.011", "tiproxy", "debug", "[test log message tiproxy 11, bar]"}, {"2019/08/26 06:25:17.011", "tidb", "critical", "[test log message tidb 14, bar]"}, {"2019/08/26 06:26:16.011", "pd", "TRACE", "[test log message pd 13, bar]"}, + {"2019/08/26 06:26:17.011", "ticdc", "ERROR", "[test log message ticdc 12, bar]"}, {"2019/08/26 06:26:17.011", "tikv", "ERROR", "[test log message tikv 12, bar]"}, {"2019/08/26 06:26:17.011", "tiproxy", "ERROR", "[test log message tiproxy 12, bar]"}, {"2019/08/26 06:27:17.011", "pd", "critical", "[test log message pd 14, bar]"}, + {"2019/08/26 06:27:18.011", "ticdc", "TRACE", "[test log message ticdc 13, bar]"}, {"2019/08/26 06:27:18.011", "tikv", "TRACE", "[test log message tikv 13, bar]"}, {"2019/08/26 06:27:18.011", "tiproxy", "TRACE", "[test log message tiproxy 13, bar]"}, + {"2019/08/26 06:28:19.011", "ticdc", "critical", "[test log message ticdc 14, bar]"}, {"2019/08/26 06:28:19.011", "tikv", "critical", "[test log message tikv 14, bar]"}, {"2019/08/26 06:28:19.011", "tiproxy", "critical", "[test log message tiproxy 14, bar]"}, } @@ -612,6 +638,7 @@ func TestTiDBClusterLog(t *testing.T) { "message like '%'", }, expected: [][]string{ + {"2019/08/26 06:19:13.011", "ticdc", "INFO", "[test log message ticdc 1, foo]"}, {"2019/08/26 06:19:13.011", "tidb", "INFO", "[test log message tidb 1, foo]"}, {"2019/08/26 06:19:13.011", "tikv", "INFO", "[test log message tikv 1, foo]"}, {"2019/08/26 06:19:13.011", "tiproxy", "INFO", "[test log message tiproxy 1, foo]"}, @@ -620,9 +647,11 @@ func TestTiDBClusterLog(t *testing.T) { {"2019/08/26 06:19:15.011", "tidb", "error", "[test log message tidb 3, foo]"}, {"2019/08/26 06:19:16.011", "tidb", "trace", "[test log message tidb 4, foo]"}, {"2019/08/26 06:19:17.011", "tidb", "CRITICAL", "[test log message tidb 5, foo]"}, + {"2019/08/26 06:20:14.011", "ticdc", "DEBUG", "[test log message ticdc 2, foo]"}, {"2019/08/26 06:20:14.011", "tikv", "DEBUG", "[test log message tikv 2, foo]"}, {"2019/08/26 06:20:14.011", "tiproxy", "DEBUG", "[test log message tiproxy 2, foo]"}, {"2019/08/26 06:20:15.011", "pd", "error", "[test log message pd 3, foo]"}, + {"2019/08/26 06:21:15.011", "ticdc", "error", "[test log message ticdc 3, foo]"}, {"2019/08/26 06:21:15.011", "tikv", "error", "[test log message tikv 3, foo]"}, {"2019/08/26 06:21:15.011", "tiproxy", "error", "[test log message tiproxy 3, foo]"}, }, @@ -759,10 +788,12 @@ func TestTiDBClusterLog(t *testing.T) { expected: [][]string{ {"2019/08/26 06:19:17.011", "tidb", "CRITICAL", "[test log message tidb 5, foo]"}, {"2019/08/26 06:22:17.011", "pd", "CRITICAL", "[test log message pd 5, foo]"}, + {"2019/08/26 06:23:17.011", "ticdc", "CRITICAL", "[test log message ticdc 5, foo]"}, {"2019/08/26 06:23:17.011", "tikv", "CRITICAL", "[test log message tikv 5, foo]"}, {"2019/08/26 06:23:17.011", "tiproxy", "CRITICAL", "[test log message tiproxy 5, foo]"}, {"2019/08/26 06:25:17.011", "tidb", "critical", "[test log message tidb 14, bar]"}, {"2019/08/26 06:27:17.011", "pd", "critical", "[test log message pd 14, bar]"}, + {"2019/08/26 06:28:19.011", "ticdc", "critical", "[test log message ticdc 14, bar]"}, {"2019/08/26 06:28:19.011", "tikv", "critical", "[test log message tikv 14, bar]"}, {"2019/08/26 06:28:19.011", "tiproxy", "critical", "[test log message tiproxy 14, bar]"}, }, diff --git a/pkg/infoschema/tables.go b/pkg/infoschema/tables.go index 715cd3be29..6dcba5401b 100644 --- a/pkg/infoschema/tables.go +++ b/pkg/infoschema/tables.go @@ -1804,11 +1804,12 @@ func GetClusterServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) { }) type retriever func(ctx sessionctx.Context) ([]ServerInfo, error) + retrievers := []retriever{GetTiDBServerInfo, GetPDServerInfo, func(ctx sessionctx.Context) ([]ServerInfo, error) { + return GetStoreServerInfo(ctx.GetStore()) + }, GetTiProxyServerInfo, GetTiCDCServerInfo} //nolint: prealloc var servers []ServerInfo - for _, r := range []retriever{GetTiDBServerInfo, GetPDServerInfo, func(ctx sessionctx.Context) ([]ServerInfo, error) { - return GetStoreServerInfo(ctx.GetStore()) - }, GetTiProxyServerInfo} { + for _, r := range retrievers { nodes, err := r(ctx) if err != nil { return nil, err @@ -2084,6 +2085,26 @@ func GetTiProxyServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) { return servers, nil } +// GetTiCDCServerInfo gets server info of TiCDC from PD. +func GetTiCDCServerInfo(ctx sessionctx.Context) ([]ServerInfo, error) { + ticdcNodes, err := infosync.GetTiCDCServerInfo(context.Background()) + if err != nil { + return nil, errors.Trace(err) + } + var servers = make([]ServerInfo, 0, len(ticdcNodes)) + for _, node := range ticdcNodes { + servers = append(servers, ServerInfo{ + ServerType: "ticdc", + Address: node.Address, + StatusAddr: node.Address, + Version: node.Version, + GitHash: node.GitHash, + StartTimestamp: node.StartTimestamp, + }) + } + return servers, nil +} + // SysVarHiddenForSem checks if a given sysvar is hidden according to SEM and privileges. func SysVarHiddenForSem(ctx sessionctx.Context, sysVarNameInLower string) bool { if !sem.IsEnabled() || !sem.IsInvisibleSysVar(sysVarNameInLower) {