[HTTP][API] Add backends info API for spark/flink connector (#6984)
Doris should provide a http api to return backends list for connectors to submit stream load, and without privilege checking, which can let common user to use it
This commit is contained in:
@ -98,6 +98,7 @@ module.exports = [
|
||||
"query-profile-action",
|
||||
],
|
||||
},
|
||||
"backends-action",
|
||||
"bootstrap-action",
|
||||
"cancel-load-action",
|
||||
"check-decommission-action",
|
||||
|
||||
@ -97,6 +97,7 @@ module.exports = [
|
||||
"query-profile-action",
|
||||
],
|
||||
},
|
||||
"backends-action",
|
||||
"bootstrap-action",
|
||||
"cancel-load-action",
|
||||
"check-decommission-action",
|
||||
|
||||
@ -0,0 +1,70 @@
|
||||
---
|
||||
{
|
||||
"title": "Backends Action",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Backends Action
|
||||
|
||||
## Request
|
||||
|
||||
```
|
||||
GET /api/backends
|
||||
```
|
||||
|
||||
## Description
|
||||
|
||||
Backends Action returns the Backends list, including Backend's IP, PORT and other information.
|
||||
|
||||
## Path parameters
|
||||
|
||||
None
|
||||
|
||||
## Query parameters
|
||||
|
||||
* `is_alive`
|
||||
|
||||
Optional parameters. Whether to return the surviving BE nodes. The default is false, which means that all BE nodes are returned.
|
||||
|
||||
## Request body
|
||||
|
||||
None
|
||||
|
||||
## Response
|
||||
|
||||
```
|
||||
{
|
||||
"msg": "success",
|
||||
"code": 0,
|
||||
"data": {
|
||||
"backends": [
|
||||
{
|
||||
"ip": "192.1.1.1",
|
||||
"http_port": 8040,
|
||||
"is_alive": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"count": 0
|
||||
}
|
||||
```
|
||||
@ -0,0 +1,70 @@
|
||||
---
|
||||
{
|
||||
"title": "Backends Action",
|
||||
"language": "zh-CN"
|
||||
}
|
||||
---
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one
|
||||
or more contributor license agreements. See the NOTICE file
|
||||
distributed with this work for additional information
|
||||
regarding copyright ownership. The ASF licenses this file
|
||||
to you under the Apache License, Version 2.0 (the
|
||||
"License"); you may not use this file except in compliance
|
||||
with the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing,
|
||||
software distributed under the License is distributed on an
|
||||
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
KIND, either express or implied. See the License for the
|
||||
specific language governing permissions and limitations
|
||||
under the License.
|
||||
-->
|
||||
|
||||
# Backends Action
|
||||
|
||||
## Request
|
||||
|
||||
```
|
||||
GET /api/backends
|
||||
```
|
||||
|
||||
## Description
|
||||
|
||||
Backends Action 返回 Backends 列表,包括 Backend 的 IP、PORT 等信息。
|
||||
|
||||
## Path parameters
|
||||
|
||||
无
|
||||
|
||||
## Query parameters
|
||||
|
||||
* `is_alive`
|
||||
|
||||
可选参数。是否返回存活的 BE 节点。默认为false,即返回所有 BE 节点。
|
||||
|
||||
## Request body
|
||||
|
||||
无
|
||||
|
||||
## Response
|
||||
|
||||
```
|
||||
{
|
||||
"msg": "success",
|
||||
"code": 0,
|
||||
"data": {
|
||||
"backends": [
|
||||
{
|
||||
"ip": "192.1.1.1",
|
||||
"http_port": 8040,
|
||||
"is_alive": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"count": 0
|
||||
}
|
||||
```
|
||||
@ -32,6 +32,7 @@ import org.apache.doris.flink.exception.DorisException;
|
||||
import org.apache.doris.flink.exception.ShouldNeverHappenException;
|
||||
import org.apache.doris.flink.rest.models.Backend;
|
||||
import org.apache.doris.flink.rest.models.BackendRow;
|
||||
import org.apache.doris.flink.rest.models.BackendV2;
|
||||
import org.apache.doris.flink.rest.models.QueryPlan;
|
||||
import org.apache.doris.flink.rest.models.Schema;
|
||||
import org.apache.doris.flink.rest.models.Tablet;
|
||||
@ -83,7 +84,9 @@ public class RestService implements Serializable {
|
||||
private static final String API_PREFIX = "/api";
|
||||
private static final String SCHEMA = "_schema";
|
||||
private static final String QUERY_PLAN = "_query_plan";
|
||||
@Deprecated
|
||||
private static final String BACKENDS = "/rest/v1/system?path=//backends";
|
||||
private static final String BACKENDS_V2 = "/api/backends?is_aliva=true";
|
||||
private static final String FE_LOGIN = "/rest/v1/login";
|
||||
|
||||
/**
|
||||
@ -250,25 +253,29 @@ public class RestService implements Serializable {
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static String randomBackend(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
|
||||
List<BackendRow> backends = getBackends(options, readOptions, logger);
|
||||
List<BackendV2.BackendRowV2> backends = getBackendsV2(options, readOptions, logger);
|
||||
logger.trace("Parse beNodes '{}'.", backends);
|
||||
if (backends == null || backends.isEmpty()) {
|
||||
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
|
||||
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
|
||||
}
|
||||
Collections.shuffle(backends);
|
||||
BackendRow backend = backends.get(0);
|
||||
return backend.getIP() + ":" + backend.getHttpPort();
|
||||
BackendV2.BackendRowV2 backend = backends.get(0);
|
||||
return backend.getIp() + ":" + backend.getHttpPort();
|
||||
}
|
||||
|
||||
/**
|
||||
* get Doris BE nodes to request.
|
||||
* get Doris BE nodes to request.
|
||||
*
|
||||
* @param options configuration of request
|
||||
* @param logger slf4j logger
|
||||
* @return the chosen one Doris BE node
|
||||
* @throws IllegalArgumentException BE nodes is illegal
|
||||
*
|
||||
* This method is deprecated. Because it needs ADMIN_PRIV to get backends, which is not suitable for common users.
|
||||
* Use getBackendsV2 instead
|
||||
*/
|
||||
@Deprecated
|
||||
@VisibleForTesting
|
||||
static List<BackendRow> getBackends(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
|
||||
String feNodes = options.getFenodes();
|
||||
@ -281,6 +288,7 @@ public class RestService implements Serializable {
|
||||
return backends;
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
Backend backend;
|
||||
@ -309,6 +317,54 @@ public class RestService implements Serializable {
|
||||
return backendRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* get Doris BE nodes to request.
|
||||
*
|
||||
* @param options configuration of request
|
||||
* @param logger slf4j logger
|
||||
* @return the chosen one Doris BE node
|
||||
* @throws IllegalArgumentException BE nodes is illegal
|
||||
*/
|
||||
@VisibleForTesting
|
||||
static List<BackendV2.BackendRowV2> getBackendsV2(DorisOptions options, DorisReadOptions readOptions, Logger logger) throws DorisException, IOException {
|
||||
String feNodes = options.getFenodes();
|
||||
String feNode = randomEndpoint(feNodes, logger);
|
||||
String beUrl = "http://" + feNode + BACKENDS_V2;
|
||||
HttpGet httpGet = new HttpGet(beUrl);
|
||||
String response = send(options, readOptions, httpGet, logger);
|
||||
logger.info("Backend Info:{}", response);
|
||||
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
|
||||
return backends;
|
||||
}
|
||||
|
||||
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException, IOException {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
BackendV2 backend;
|
||||
try {
|
||||
backend = mapper.readValue(response, BackendV2.class);
|
||||
} catch (JsonParseException e) {
|
||||
String errMsg = "Doris BE's response is not a json. res: " + response;
|
||||
logger.error(errMsg, e);
|
||||
throw new DorisException(errMsg, e);
|
||||
} catch (JsonMappingException e) {
|
||||
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
|
||||
logger.error(errMsg, e);
|
||||
throw new DorisException(errMsg, e);
|
||||
} catch (IOException e) {
|
||||
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
|
||||
logger.error(errMsg, e);
|
||||
throw new DorisException(errMsg, e);
|
||||
}
|
||||
|
||||
if (backend == null) {
|
||||
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
|
||||
throw new ShouldNeverHappenException();
|
||||
}
|
||||
List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
|
||||
logger.debug("Parsing schema result is '{}'.", backendRows);
|
||||
return backendRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* get a valid URI to connect Doris FE.
|
||||
*
|
||||
|
||||
@ -25,6 +25,7 @@ import java.util.List;
|
||||
/**
|
||||
* Be response model
|
||||
**/
|
||||
@Deprecated
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class Backend {
|
||||
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.flink.rest.models;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
@Deprecated
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class BackendRow {
|
||||
|
||||
|
||||
@ -0,0 +1,74 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.flink.rest.models;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Be response model
|
||||
**/
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class BackendV2 {
|
||||
|
||||
@JsonProperty(value = "backends")
|
||||
private List<BackendRowV2> backends;
|
||||
|
||||
public List<BackendRowV2> getBackends() {
|
||||
return backends;
|
||||
}
|
||||
|
||||
public void setBackends(List<BackendRowV2> backends) {
|
||||
this.backends = backends;
|
||||
}
|
||||
|
||||
public static class BackendRowV2 {
|
||||
@JsonProperty("ip")
|
||||
public String ip;
|
||||
@JsonProperty("http_port")
|
||||
public int httpPort;
|
||||
@JsonProperty("is_alive")
|
||||
public boolean isAlive;
|
||||
|
||||
public String getIp() {
|
||||
return ip;
|
||||
}
|
||||
|
||||
public void setIp(String ip) {
|
||||
this.ip = ip;
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return httpPort;
|
||||
}
|
||||
|
||||
public void setHttpPort(int httpPort) {
|
||||
this.httpPort = httpPort;
|
||||
}
|
||||
|
||||
public boolean isAlive() {
|
||||
return isAlive;
|
||||
}
|
||||
|
||||
public void setAlive(boolean alive) {
|
||||
isAlive = alive;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -39,7 +39,6 @@ fi
|
||||
# check maven
|
||||
MVN_CMD=mvn
|
||||
|
||||
|
||||
if [[ ! -z ${CUSTOM_MVN} ]]; then
|
||||
MVN_CMD=${CUSTOM_MVN}
|
||||
fi
|
||||
@ -48,11 +47,26 @@ if ! ${MVN_CMD} --version; then
|
||||
exit 1
|
||||
fi
|
||||
export MVN_CMD
|
||||
if [ $1 == 3 ]
|
||||
|
||||
usage() {
|
||||
echo "
|
||||
Eg.
|
||||
$0 2 build with spark 2.x
|
||||
$0 3 build with spark 3.x
|
||||
"
|
||||
exit 1
|
||||
}
|
||||
|
||||
if [ $# == 0 ]; then
|
||||
usage
|
||||
fi
|
||||
|
||||
|
||||
if [ "$1"x == "3x" ]
|
||||
then
|
||||
${MVN_CMD} clean package -f pom_3.0.xml
|
||||
fi
|
||||
if [ $1 == 2 ]
|
||||
if [ "$1"x == "2x" ]
|
||||
then
|
||||
${MVN_CMD} clean package
|
||||
fi
|
||||
|
||||
@ -161,6 +161,12 @@
|
||||
<version>4.1.27.Final</version>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
|
||||
@ -75,7 +75,7 @@ public class DorisStreamLoad implements Serializable{
|
||||
}
|
||||
|
||||
public DorisStreamLoad(SparkSettings settings) throws IOException, DorisException {
|
||||
String hostPort = RestService.randomBackend(settings, LOG);
|
||||
String hostPort = RestService.randomBackendV2(settings, LOG);
|
||||
this.hostPort = hostPort;
|
||||
String[] dbTable = settings.getProperty(ConfigurationOptions.DORIS_TABLE_IDENTIFIER).split("\\.");
|
||||
this.db = dbTable[0];
|
||||
|
||||
@ -62,6 +62,7 @@ import org.apache.doris.spark.exception.IllegalArgumentException;
|
||||
import org.apache.doris.spark.exception.ShouldNeverHappenException;
|
||||
import org.apache.doris.spark.rest.models.Backend;
|
||||
import org.apache.doris.spark.rest.models.BackendRow;
|
||||
import org.apache.doris.spark.rest.models.BackendV2;
|
||||
import org.apache.doris.spark.rest.models.QueryPlan;
|
||||
import org.apache.doris.spark.rest.models.Schema;
|
||||
import org.apache.doris.spark.rest.models.Tablet;
|
||||
@ -86,8 +87,9 @@ public class RestService implements Serializable {
|
||||
private static final String API_PREFIX = "/api";
|
||||
private static final String SCHEMA = "_schema";
|
||||
private static final String QUERY_PLAN = "_query_plan";
|
||||
@Deprecated
|
||||
private static final String BACKENDS = "/rest/v1/system?path=//backends";
|
||||
|
||||
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
|
||||
|
||||
/**
|
||||
* send request to Doris FE and get response json string.
|
||||
@ -478,14 +480,17 @@ public class RestService implements Serializable {
|
||||
* @param logger slf4j logger
|
||||
* @return the chosen one Doris BE node
|
||||
* @throws IllegalArgumentException BE nodes is illegal
|
||||
* Deprecated, use randomBackendV2 instead
|
||||
*/
|
||||
@Deprecated
|
||||
@VisibleForTesting
|
||||
public static String randomBackend(SparkSettings sparkSettings , Logger logger) throws DorisException, IOException {
|
||||
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
|
||||
String feNode = randomEndpoint(feNodes, logger);
|
||||
String beUrl = String.format("http://%s" + BACKENDS,feNode);
|
||||
String beUrl = String.format("http://%s" + BACKENDS, feNode);
|
||||
HttpGet httpGet = new HttpGet(beUrl);
|
||||
String response = send(sparkSettings,httpGet, logger);
|
||||
logger.info("Backend Info:{}",response);
|
||||
String response = send(sparkSettings, httpGet, logger);
|
||||
logger.info("Backend Info:{}", response);
|
||||
List<BackendRow> backends = parseBackend(response, logger);
|
||||
logger.trace("Parse beNodes '{}'.", backends);
|
||||
if (backends == null || backends.isEmpty()) {
|
||||
@ -497,7 +502,6 @@ public class RestService implements Serializable {
|
||||
return backend.getIP() + ":" + backend.getHttpPort();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* translate Doris FE response to inner {@link BackendRow} struct.
|
||||
* @param response Doris FE response
|
||||
@ -505,6 +509,7 @@ public class RestService implements Serializable {
|
||||
* @return inner {@link List<BackendRow>} struct
|
||||
* @throws DorisException,IOException throw when translate failed
|
||||
* */
|
||||
@Deprecated
|
||||
@VisibleForTesting
|
||||
static List<BackendRow> parseBackend(String response, Logger logger) throws DorisException, IOException {
|
||||
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
|
||||
@ -534,6 +539,59 @@ public class RestService implements Serializable {
|
||||
return backendRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* choice a Doris BE node to request.
|
||||
* @param logger slf4j logger
|
||||
* @return the chosen one Doris BE node
|
||||
* @throws IllegalArgumentException BE nodes is illegal
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static String randomBackendV2(SparkSettings sparkSettings, Logger logger) throws DorisException {
|
||||
String feNodes = sparkSettings.getProperty(DORIS_FENODES);
|
||||
String feNode = randomEndpoint(feNodes, logger);
|
||||
String beUrl = String.format("http://%s" + BACKENDS_V2, feNode);
|
||||
HttpGet httpGet = new HttpGet(beUrl);
|
||||
String response = send(sparkSettings, httpGet, logger);
|
||||
logger.info("Backend Info:{}", response);
|
||||
List<BackendV2.BackendRowV2> backends = parseBackendV2(response, logger);
|
||||
logger.trace("Parse beNodes '{}'.", backends);
|
||||
if (backends == null || backends.isEmpty()) {
|
||||
logger.error(ILLEGAL_ARGUMENT_MESSAGE, "beNodes", backends);
|
||||
throw new IllegalArgumentException("beNodes", String.valueOf(backends));
|
||||
}
|
||||
Collections.shuffle(backends);
|
||||
BackendV2.BackendRowV2 backend = backends.get(0);
|
||||
return backend.getIp() + ":" + backend.getHttpPort();
|
||||
}
|
||||
|
||||
static List<BackendV2.BackendRowV2> parseBackendV2(String response, Logger logger) throws DorisException {
|
||||
com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
|
||||
BackendV2 backend;
|
||||
try {
|
||||
backend = mapper.readValue(response, BackendV2.class);
|
||||
} catch (com.fasterxml.jackson.core.JsonParseException e) {
|
||||
String errMsg = "Doris BE's response is not a json. res: " + response;
|
||||
logger.error(errMsg, e);
|
||||
throw new DorisException(errMsg, e);
|
||||
} catch (com.fasterxml.jackson.databind.JsonMappingException e) {
|
||||
String errMsg = "Doris BE's response cannot map to schema. res: " + response;
|
||||
logger.error(errMsg, e);
|
||||
throw new DorisException(errMsg, e);
|
||||
} catch (IOException e) {
|
||||
String errMsg = "Parse Doris BE's response to json failed. res: " + response;
|
||||
logger.error(errMsg, e);
|
||||
throw new DorisException(errMsg, e);
|
||||
}
|
||||
|
||||
if (backend == null) {
|
||||
logger.error(SHOULD_NOT_HAPPEN_MESSAGE);
|
||||
throw new ShouldNeverHappenException();
|
||||
}
|
||||
List<BackendV2.BackendRowV2> backendRows = backend.getBackends();
|
||||
logger.debug("Parsing schema result is '{}'.", backendRows);
|
||||
return backendRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* translate BE tablets map to Doris RDD partition.
|
||||
* @param cfg configuration of request
|
||||
|
||||
@ -23,6 +23,7 @@ import java.util.List;
|
||||
/**
|
||||
* Be response model
|
||||
**/
|
||||
@Deprecated
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class Backend {
|
||||
|
||||
|
||||
@ -15,8 +15,11 @@
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package org.apache.doris.spark.rest.models;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
@Deprecated
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class BackendRow {
|
||||
|
||||
|
||||
@ -0,0 +1,72 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
package org.apache.doris.spark.rest.models;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Be response model
|
||||
**/
|
||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||
public class BackendV2 {
|
||||
|
||||
@JsonProperty(value = "backends")
|
||||
private List<BackendRowV2> backends;
|
||||
|
||||
public List<BackendRowV2> getBackends() {
|
||||
return backends;
|
||||
}
|
||||
|
||||
public void setRows(List<BackendRowV2> rows) {
|
||||
this.backends = rows;
|
||||
}
|
||||
|
||||
public static class BackendRowV2 {
|
||||
@JsonProperty("ip")
|
||||
public String ip;
|
||||
@JsonProperty("http_port")
|
||||
public int httpPort;
|
||||
@JsonProperty("is_alive")
|
||||
public boolean isAlive;
|
||||
|
||||
public String getIp() {
|
||||
return ip;
|
||||
}
|
||||
|
||||
public void setIp(String ip) {
|
||||
this.ip = ip;
|
||||
}
|
||||
|
||||
public int getHttpPort() {
|
||||
return httpPort;
|
||||
}
|
||||
|
||||
public void setHttpPort(int httpPort) {
|
||||
this.httpPort = httpPort;
|
||||
}
|
||||
|
||||
public boolean isAlive() {
|
||||
return isAlive;
|
||||
}
|
||||
|
||||
public void setAlive(boolean alive) {
|
||||
isAlive = alive;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -38,6 +38,7 @@ import org.apache.doris.spark.cfg.Settings;
|
||||
import org.apache.doris.spark.exception.DorisException;
|
||||
import org.apache.doris.spark.exception.IllegalArgumentException;
|
||||
import org.apache.doris.spark.rest.models.BackendRow;
|
||||
import org.apache.doris.spark.rest.models.BackendV2;
|
||||
import org.apache.doris.spark.rest.models.Field;
|
||||
import org.apache.doris.spark.rest.models.QueryPlan;
|
||||
import org.apache.doris.spark.rest.models.Schema;
|
||||
@ -49,6 +50,8 @@ import org.junit.rules.ExpectedException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import jdk.nashorn.internal.ir.annotations.Ignore;
|
||||
|
||||
public class TestRestService {
|
||||
private static Logger logger = LoggerFactory.getLogger(TestRestService.class);
|
||||
|
||||
@ -295,7 +298,8 @@ public class TestRestService {
|
||||
Assert.assertEquals(expected, actual);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Deprecated
|
||||
@Ignore
|
||||
public void testParseBackend() throws Exception {
|
||||
String response = "{\"href_columns\":[\"BackendId\"],\"parent_url\":\"/rest/v1/system?path=/\"," +
|
||||
"\"column_names\":[\"BackendId\",\"Cluster\",\"IP\",\"HostName\",\"HeartbeatPort\",\"BePort\"," +
|
||||
@ -313,4 +317,11 @@ public class TestRestService {
|
||||
List<BackendRow> backendRows = RestService.parseBackend(response, logger);
|
||||
Assert.assertTrue(backendRows != null && !backendRows.isEmpty());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseBackendV2() throws Exception {
|
||||
String response = "{\"backends\":[{\"ip\":\"192.168.1.1\",\"http_port\":8042,\"is_alive\":true}, {\"ip\":\"192.168.1.2\",\"http_port\":8042,\"is_alive\":true}]}";
|
||||
List<BackendV2.BackendRowV2> backendRows = RestService.parseBackendV2(response, logger);
|
||||
Assert.assertEquals(2, backendRows.size());
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,8 +19,12 @@ package org.apache.doris.spark.sql
|
||||
|
||||
import org.apache.spark.sql.SparkSession
|
||||
import org.apache.spark.{SparkConf, SparkContext}
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test
|
||||
|
||||
// This test need real connect info to run.
|
||||
// Set the connect info before comment out this @Ignore
|
||||
@Ignore
|
||||
class TestSparkConnector {
|
||||
val dorisFeNodes = "your_fe_host:8030"
|
||||
val dorisUser = "root"
|
||||
@ -107,3 +111,4 @@ class TestSparkConnector {
|
||||
.start().awaitTermination()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,112 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.httpv2.rest;
|
||||
|
||||
import org.apache.doris.catalog.Catalog;
|
||||
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
|
||||
import org.apache.doris.system.Backend;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.util.List;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
/**
|
||||
* This class responsible for returning current backends info.
|
||||
* Mainly used for flink/spark connector, which need backends info to execute stream load.
|
||||
* It only requires password, no auth check.
|
||||
* <p>
|
||||
* Response:
|
||||
* <p>
|
||||
* {
|
||||
* "msg": "success",
|
||||
* "code": 0,
|
||||
* "data": {
|
||||
* "backends": [
|
||||
* {
|
||||
* "ip": "192.1.1.1",
|
||||
* "http_port": 8040,
|
||||
* "is_alive": true
|
||||
* }
|
||||
* ]
|
||||
* },
|
||||
* "count": 0
|
||||
* }
|
||||
*/
|
||||
@RestController
|
||||
public class BackendsAction extends RestBaseController {
|
||||
public static final Logger LOG = LogManager.getLogger(BackendsAction.class);
|
||||
|
||||
private static final String IS_ALIVE = "is_alive";
|
||||
|
||||
@RequestMapping(path = "/api/backends", method = {RequestMethod.GET})
|
||||
public Object getBackends(HttpServletRequest request, HttpServletResponse response) {
|
||||
executeCheckPassword(request, response);
|
||||
|
||||
boolean needAlive = false;
|
||||
String isAlive = request.getParameter(IS_ALIVE);
|
||||
if (!Strings.isNullOrEmpty(isAlive) && isAlive.equalsIgnoreCase("true")) {
|
||||
needAlive = true;
|
||||
}
|
||||
|
||||
BackendInfo backendInfo = new BackendInfo();
|
||||
backendInfo.backends = Lists.newArrayList();
|
||||
List<Long> beIds = Catalog.getCurrentSystemInfo().getBackendIds(needAlive);
|
||||
for (Long beId : beIds) {
|
||||
Backend be = Catalog.getCurrentSystemInfo().getBackend(beId);
|
||||
if (be != null) {
|
||||
BackendRow backendRow = new BackendRow();
|
||||
backendRow.ip = be.getHost();
|
||||
backendRow.httpPort = be.getHttpPort();
|
||||
backendRow.isAlive = be.isAlive();
|
||||
backendInfo.backends.add(backendRow);
|
||||
}
|
||||
}
|
||||
return ResponseEntityBuilder.ok(backendInfo);
|
||||
}
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
public static class BackendInfo {
|
||||
@JsonProperty("backends")
|
||||
public List<BackendRow> backends;
|
||||
}
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
public static class BackendRow {
|
||||
@JsonProperty("ip")
|
||||
public String ip;
|
||||
@JsonProperty("http_port")
|
||||
public int httpPort;
|
||||
@JsonProperty("is_alive")
|
||||
public boolean isAlive;
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user