From 29838f07daadb62ad74db45156ff0dbda3583b4b Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 5 Nov 2021 09:43:06 +0800 Subject: [PATCH] [HTTP][API] Add backends info API for spark/flink connector (#6984) Doris should provide a http api to return backends list for connectors to submit stream load, and without privilege checking, which can let common user to use it --- docs/.vuepress/sidebar/en.js | 1 + docs/.vuepress/sidebar/zh-CN.js | 1 + .../http-actions/fe/backends-action.md | 70 +++++++++++ .../http-actions/fe/backends-action.md | 70 +++++++++++ .../apache/doris/flink/rest/RestService.java | 64 +++++++++- .../doris/flink/rest/models/Backend.java | 1 + .../doris/flink/rest/models/BackendRow.java | 1 + .../doris/flink/rest/models/BackendV2.java | 74 ++++++++++++ extension/spark-doris-connector/build.sh | 20 +++- extension/spark-doris-connector/pom_3.0.xml | 6 + .../apache/doris/spark/DorisStreamLoad.java | 2 +- .../apache/doris/spark/rest/RestService.java | 68 ++++++++++- .../doris/spark/rest/models/Backend.java | 1 + .../doris/spark/rest/models/BackendRow.java | 3 + .../doris/spark/rest/models/BackendV2.java | 72 +++++++++++ .../doris/spark/rest/TestRestService.java | 13 +- .../doris/spark/sql/TestSparkConnector.scala | 5 + .../doris/httpv2/rest/BackendsAction.java | 112 ++++++++++++++++++ 18 files changed, 570 insertions(+), 14 deletions(-) create mode 100644 docs/en/administrator-guide/http-actions/fe/backends-action.md create mode 100644 docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md create mode 100644 extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java create mode 100644 extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 39be10bed0..d9c2d53b6e 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -98,6 +98,7 @@ module.exports = [ "query-profile-action", ], }, + "backends-action", "bootstrap-action", "cancel-load-action", "check-decommission-action", diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index 837c04eda4..847784baa5 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -97,6 +97,7 @@ module.exports = [ "query-profile-action", ], }, + "backends-action", "bootstrap-action", "cancel-load-action", "check-decommission-action", diff --git a/docs/en/administrator-guide/http-actions/fe/backends-action.md b/docs/en/administrator-guide/http-actions/fe/backends-action.md new file mode 100644 index 0000000000..17589dd95d --- /dev/null +++ b/docs/en/administrator-guide/http-actions/fe/backends-action.md @@ -0,0 +1,70 @@ +--- +{ + "title": "Backends Action", + "language": "zh-CN" +} +--- + + + +# Backends Action + +## Request + +``` +GET /api/backends +``` + +## Description + +Backends Action returns the Backends list, including Backend's IP, PORT and other information. + +## Path parameters + +None + +## Query parameters + +* `is_alive` + + Optional parameters. Whether to return the surviving BE nodes. The default is false, which means that all BE nodes are returned. + +## Request body + +None + +## Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "backends": [ + { + "ip": "192.1.1.1", + "http_port": 8040, + "is_alive": true + } + ] + }, + "count": 0 +} +``` diff --git a/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md b/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md new file mode 100644 index 0000000000..3c76675cbf --- /dev/null +++ b/docs/zh-CN/administrator-guide/http-actions/fe/backends-action.md @@ -0,0 +1,70 @@ +--- +{ + "title": "Backends Action", + "language": "zh-CN" +} +--- + + + +# Backends Action + +## Request + +``` +GET /api/backends +``` + +## Description + +Backends Action 返回 Backends 列表,包括 Backend 的 IP、PORT 等信息。 + +## Path parameters + +无 + +## Query parameters + +* `is_alive` + + 可选参数。是否返回存活的 BE 节点。默认为false,即返回所有 BE 节点。 + +## Request body + +无 + +## Response + +``` +{ + "msg": "success", + "code": 0, + "data": { + "backends": [ + { + "ip": "192.1.1.1", + "http_port": 8040, + "is_alive": true + } + ] + }, + "count": 0 +} +``` diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java index 184afd34aa..1e6310cbda 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -32,6 +32,7 @@ import org.apache.doris.flink.exception.DorisException; import org.apache.doris.flink.exception.ShouldNeverHappenException; import org.apache.doris.flink.rest.models.Backend; import org.apache.doris.flink.rest.models.BackendRow; +import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.rest.models.QueryPlan; import org.apache.doris.flink.rest.models.Schema; import org.apache.doris.flink.rest.models.Tablet; @@ -83,7 +84,9 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; + @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends"; + private static final String BACKENDS_V2 = "/api/backends?is_aliva=true"; private static final String FE_LOGIN = "/rest/v1/login"; /** @@ -250,25 +253,29 @@ public class RestService implements Serializable { */ @VisibleForTesting public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { - List backends = getBackends(options, readOptions, logger); + List backends = getBackendsV2(options, readOptions, logger); logger.trace("Parse beNodes '{}'.", backends); if (backends == null || backends.isEmpty()) { logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); throw new IllegalArgumentException("beNodes", String.valueOf(backends)); } Collections.shuffle(backends); - BackendRow backend = backends.get(0); - return backend.getIP() + ":" + backend.getHttpPort(); + BackendV2.BackendRowV2 backend = backends.get(0); + return backend.getIp() + ":" + backend.getHttpPort(); } /** - * get Doris BE nodes to request. + * get Doris BE nodes to request. * * @param options configuration of request * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal + * + * This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users. + * Use getBackendsV2 instead */ + @Deprecated @VisibleForTesting static List getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { String feNodes = options.getFenodes(); @@ -281,6 +288,7 @@ public class RestService implements Serializable { return backends; } + @Deprecated static List parseBackend(String response, Logger logger) throws DorisException, IOException { ObjectMapper mapper = new ObjectMapper(); Backend backend; @@ -309,6 +317,54 @@ public class RestService implements Serializable { return backendRows; } + /** + * get Doris BE nodes to request. + * + * @param options configuration of request + * @param logger slf4j logger + * @return the chosen one Doris BE node + * @throws IllegalArgumentException BE nodes is illegal + */ + @VisibleForTesting + static List getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException { + String feNodes = options.getFenodes(); + String feNode = randomEndpoint(feNodes, logger); + String beUrl = "http://" + feNode + BACKENDS_V2; + HttpGet httpGet = new HttpGet(beUrl); + String response = send(options, readOptions, httpGet, logger); + logger.info("Backend Info:{}", response); + List backends = parseBackendV2(response, logger); + return backends; + } + + static List parseBackendV2(String response, Logger logger) throws DorisException, IOException { + ObjectMapper mapper = new ObjectMapper(); + BackendV2 backend; + try { + backend = mapper.readValue(response, BackendV2.class); + } catch (JsonParseException e) { + String errMsg = "Doris BE's response is not a json. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (JsonMappingException e) { + String errMsg = "Doris BE's response cannot map to schema. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (IOException e) { + String errMsg = "Parse Doris BE's response to json failed. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } + + if (backend == null) { + logger.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + List backendRows = backend.getBackends(); + logger.debug("Parsing schema result is '{}'.", backendRows); + return backendRows; + } + /** * get a valid URI to connect Doris FE. * diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java index d74e46f66f..d91614f442 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/Backend.java @@ -25,6 +25,7 @@ import java.util.List; /** * Be response model **/ +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class Backend { diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java index 5b7df996ad..3dd04710ae 100644 --- a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendRow.java @@ -20,6 +20,7 @@ package org.apache.doris.flink.rest.models; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class BackendRow { diff --git a/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java new file mode 100644 index 0000000000..5efb85ec07 --- /dev/null +++ b/extension/flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.flink.rest.models; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * Be response model + **/ +@JsonIgnoreProperties(ignoreUnknown = true) +public class BackendV2 { + + @JsonProperty(value = "backends") + private List backends; + + public List getBackends() { + return backends; + } + + public void setBackends(List backends) { + this.backends = backends; + } + + public static class BackendRowV2 { + @JsonProperty("ip") + public String ip; + @JsonProperty("http_port") + public int httpPort; + @JsonProperty("is_alive") + public boolean isAlive; + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public boolean isAlive() { + return isAlive; + } + + public void setAlive(boolean alive) { + isAlive = alive; + } + } +} diff --git a/extension/spark-doris-connector/build.sh b/extension/spark-doris-connector/build.sh index b4ea0429a7..d6ba4351dc 100755 --- a/extension/spark-doris-connector/build.sh +++ b/extension/spark-doris-connector/build.sh @@ -39,7 +39,6 @@ fi # check maven MVN_CMD=mvn - if [[ ! -z ${CUSTOM_MVN} ]]; then MVN_CMD=${CUSTOM_MVN} fi @@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then exit 1 fi export MVN_CMD -if [ $1 == 3 ] + +usage() { + echo " + Eg. + $0 2 build with spark 2.x + $0 3 build with spark 3.x + " + exit 1 +} + +if [ $# == 0 ]; then + usage +fi + + +if [ "$1"x == "3x" ] then ${MVN_CMD} clean package -f pom_3.0.xml fi -if [ $1 == 2 ] +if [ "$1"x == "2x" ] then ${MVN_CMD} clean package fi diff --git a/extension/spark-doris-connector/pom_3.0.xml b/extension/spark-doris-connector/pom_3.0.xml index 6c8eee564f..d208ad0528 100644 --- a/extension/spark-doris-connector/pom_3.0.xml +++ b/extension/spark-doris-connector/pom_3.0.xml @@ -161,6 +161,12 @@ 4.1.27.Final provided + + junit + junit + 4.12 + test + diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java index 117825c639..4411fbc631 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/DorisStreamLoad.java @@ -75,7 +75,7 @@ public class DorisStreamLoad implements Serializable{ } public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException { - String hostPort = RestService.randomBackend(settings, LOG); + String hostPort = RestService.randomBackendV2(settings, LOG); this.hostPort = hostPort; String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\."); this.db = dbTable[0]; diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java index bb91538b7f..e8a295648a 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/RestService.java @@ -62,6 +62,7 @@ import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.exception.ShouldNeverHappenException; import org.apache.doris.spark.rest.models.Backend; import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; import org.apache.doris.spark.rest.models.Tablet; @@ -86,8 +87,9 @@ public class RestService implements Serializable { private static final String API_PREFIX = "/api"; private static final String SCHEMA = "_schema"; private static final String QUERY_PLAN = "_query_plan"; + @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends"; - + private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; /** * send request to Doris FE and get response json string. @@ -478,14 +480,17 @@ public class RestService implements Serializable { * @param logger slf4j logger * @return the chosen one Doris BE node * @throws IllegalArgumentException BE nodes is illegal + * Deprecated, use randomBackendV2 instead */ + @Deprecated + @VisibleForTesting public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException { String feNodes = sparkSettings.getProperty(DORIS_FENODES); String feNode = randomEndpoint(feNodes, logger); - String beUrl = String.format("http://%s" + BACKENDS,feNode); + String beUrl = String.format("http://%s" + BACKENDS, feNode); HttpGet httpGet = new HttpGet(beUrl); - String response = send(sparkSettings,httpGet, logger); - logger.info("Backend Info:{}",response); + String response = send(sparkSettings, httpGet, logger); + logger.info("Backend Info:{}", response); List backends = parseBackend(response, logger); logger.trace("Parse beNodes '{}'.", backends); if (backends == null || backends.isEmpty()) { @@ -497,7 +502,6 @@ public class RestService implements Serializable { return backend.getIP() + ":" + backend.getHttpPort(); } - /** * translate Doris FE response to inner {@link BackendRow} struct. * @param response Doris FE response @@ -505,6 +509,7 @@ public class RestService implements Serializable { * @return inner {@link List} struct * @throws DorisException,IOException throw when translate failed * */ + @Deprecated @VisibleForTesting static List parseBackend(String response, Logger logger) throws DorisException, IOException { com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); @@ -534,6 +539,59 @@ public class RestService implements Serializable { return backendRows; } + /** + * choice a Doris BE node to request. + * @param logger slf4j logger + * @return the chosen one Doris BE node + * @throws IllegalArgumentException BE nodes is illegal + */ + @VisibleForTesting + public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException { + String feNodes = sparkSettings.getProperty(DORIS_FENODES); + String feNode = randomEndpoint(feNodes, logger); + String beUrl = String.format("http://%s" + BACKENDS_V2, feNode); + HttpGet httpGet = new HttpGet(beUrl); + String response = send(sparkSettings, httpGet, logger); + logger.info("Backend Info:{}", response); + List backends = parseBackendV2(response, logger); + logger.trace("Parse beNodes '{}'.", backends); + if (backends == null || backends.isEmpty()) { + logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends); + throw new IllegalArgumentException("beNodes", String.valueOf(backends)); + } + Collections.shuffle(backends); + BackendV2.BackendRowV2 backend = backends.get(0); + return backend.getIp() + ":" + backend.getHttpPort(); + } + + static List parseBackendV2(String response, Logger logger) throws DorisException { + com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + BackendV2 backend; + try { + backend = mapper.readValue(response, BackendV2.class); + } catch (com.fasterxml.jackson.core.JsonParseException e) { + String errMsg = "Doris BE's response is not a json. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (com.fasterxml.jackson.databind.JsonMappingException e) { + String errMsg = "Doris BE's response cannot map to schema. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } catch (IOException e) { + String errMsg = "Parse Doris BE's response to json failed. res: " + response; + logger.error(errMsg, e); + throw new DorisException(errMsg, e); + } + + if (backend == null) { + logger.error(SHOULD_NOT_HAPPEN_MESSAGE); + throw new ShouldNeverHappenException(); + } + List backendRows = backend.getBackends(); + logger.debug("Parsing schema result is '{}'.", backendRows); + return backendRows; + } + /** * translate BE tablets map to Doris RDD partition. * @param cfg configuration of request diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java index 122e71ccc9..322202d220 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/Backend.java @@ -23,6 +23,7 @@ import java.util.List; /** * Be response model **/ +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class Backend { diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java index 0e2b38594f..a84ad2c76c 100644 --- a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendRow.java @@ -15,8 +15,11 @@ // specific language governing permissions and limitations // under the License. package org.apache.doris.spark.rest.models; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; + +@Deprecated @JsonIgnoreProperties(ignoreUnknown = true) public class BackendRow { diff --git a/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java new file mode 100644 index 0000000000..75a251446a --- /dev/null +++ b/extension/spark-doris-connector/src/main/java/org/apache/doris/spark/rest/models/BackendV2.java @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +package org.apache.doris.spark.rest.models; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +/** + * Be response model + **/ +@JsonIgnoreProperties(ignoreUnknown = true) +public class BackendV2 { + + @JsonProperty(value = "backends") + private List backends; + + public List getBackends() { + return backends; + } + + public void setRows(List rows) { + this.backends = rows; + } + + public static class BackendRowV2 { + @JsonProperty("ip") + public String ip; + @JsonProperty("http_port") + public int httpPort; + @JsonProperty("is_alive") + public boolean isAlive; + + public String getIp() { + return ip; + } + + public void setIp(String ip) { + this.ip = ip; + } + + public int getHttpPort() { + return httpPort; + } + + public void setHttpPort(int httpPort) { + this.httpPort = httpPort; + } + + public boolean isAlive() { + return isAlive; + } + + public void setAlive(boolean alive) { + isAlive = alive; + } + } +} diff --git a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java index 484be456b0..22d542a75d 100644 --- a/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java +++ b/extension/spark-doris-connector/src/test/java/org/apache/doris/spark/rest/TestRestService.java @@ -38,6 +38,7 @@ import org.apache.doris.spark.cfg.Settings; import org.apache.doris.spark.exception.DorisException; import org.apache.doris.spark.exception.IllegalArgumentException; import org.apache.doris.spark.rest.models.BackendRow; +import org.apache.doris.spark.rest.models.BackendV2; import org.apache.doris.spark.rest.models.Field; import org.apache.doris.spark.rest.models.QueryPlan; import org.apache.doris.spark.rest.models.Schema; @@ -49,6 +50,8 @@ import org.junit.rules.ExpectedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import jdk.nashorn.internal.ir.annotations.Ignore; + public class TestRestService { private static Logger logger = LoggerFactory.getLogger(TestRestService.class); @@ -295,7 +298,8 @@ public class TestRestService { Assert.assertEquals(expected, actual); } - @Test + @Deprecated + @Ignore public void testParseBackend() throws Exception { String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," + "\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," + @@ -313,4 +317,11 @@ public class TestRestService { List backendRows = RestService.parseBackend(response, logger); Assert.assertTrue(backendRows != null && !backendRows.isEmpty()); } + + @Test + public void testParseBackendV2() throws Exception { + String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}"; + List backendRows = RestService.parseBackendV2(response, logger); + Assert.assertEquals(2, backendRows.size()); + } } diff --git a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala index e0d39af7a8..be54aa966f 100644 --- a/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala +++ b/extension/spark-doris-connector/src/test/scala/org/apache/doris/spark/sql/TestSparkConnector.scala @@ -19,8 +19,12 @@ package org.apache.doris.spark.sql import org.apache.spark.sql.SparkSession import org.apache.spark.{SparkConf, SparkContext} +import org.junit.Ignore; import org.junit.Test +// This test need real connect info to run. +// Set the connect info before comment out this @Ignore +@Ignore class TestSparkConnector { val dorisFeNodes = "your_fe_host:8030" val dorisUser = "root" @@ -107,3 +111,4 @@ class TestSparkConnector { .start().awaitTermination() } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java new file mode 100644 index 0000000000..4d88c52697 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/BackendsAction.java @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.httpv2.rest; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.httpv2.entity.ResponseEntityBuilder; +import org.apache.doris.system.Backend; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +import java.util.List; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import lombok.Getter; +import lombok.Setter; + +/** + * This class responsible for returning current backends info. + * Mainly used for flink/spark connector, which need backends info to execute stream load. + * It only requires password, no auth check. + *

+ * Response: + *

+ * { + * "msg": "success", + * "code": 0, + * "data": { + * "backends": [ + * { + * "ip": "192.1.1.1", + * "http_port": 8040, + * "is_alive": true + * } + * ] + * }, + * "count": 0 + * } + */ +@RestController +public class BackendsAction extends RestBaseController { + public static final Logger LOG = LogManager.getLogger(BackendsAction.class); + + private static final String IS_ALIVE = "is_alive"; + + @RequestMapping(path = "/api/backends", method = {RequestMethod.GET}) + public Object getBackends(HttpServletRequest request, HttpServletResponse response) { + executeCheckPassword(request, response); + + boolean needAlive = false; + String isAlive = request.getParameter(IS_ALIVE); + if (!Strings.isNullOrEmpty(isAlive) && isAlive.equalsIgnoreCase("true")) { + needAlive = true; + } + + BackendInfo backendInfo = new BackendInfo(); + backendInfo.backends = Lists.newArrayList(); + List beIds = Catalog.getCurrentSystemInfo().getBackendIds(needAlive); + for (Long beId : beIds) { + Backend be = Catalog.getCurrentSystemInfo().getBackend(beId); + if (be != null) { + BackendRow backendRow = new BackendRow(); + backendRow.ip = be.getHost(); + backendRow.httpPort = be.getHttpPort(); + backendRow.isAlive = be.isAlive(); + backendInfo.backends.add(backendRow); + } + } + return ResponseEntityBuilder.ok(backendInfo); + } + + @Getter + @Setter + public static class BackendInfo { + @JsonProperty("backends") + public List backends; + } + + @Getter + @Setter + public static class BackendRow { + @JsonProperty("ip") + public String ip; + @JsonProperty("http_port") + public int httpPort; + @JsonProperty("is_alive") + public boolean isAlive; + } +}