bp #40444
This commit is contained in:
@ -23,8 +23,10 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.node.ArrayNode;
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import okhttp3.Credentials;
|
||||
import okhttp3.MediaType;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.RequestBody;
|
||||
import okhttp3.Response;
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -125,12 +127,27 @@ public class EsRestClient {
|
||||
return indexMapping;
|
||||
}
|
||||
|
||||
/**
|
||||
* Search specific index
|
||||
*/
|
||||
public String searchIndex(String indexName, String body) throws DorisEsException {
|
||||
String path = indexName + "/_search";
|
||||
RequestBody requestBody = null;
|
||||
if (Strings.isNotEmpty(body)) {
|
||||
requestBody = RequestBody.create(
|
||||
body,
|
||||
MediaType.get("application/json")
|
||||
);
|
||||
}
|
||||
return executeWithRequestBody(path, requestBody);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether index exist.
|
||||
**/
|
||||
public boolean existIndex(OkHttpClient httpClient, String indexName) {
|
||||
String path = indexName + "/_mapping";
|
||||
try (Response response = executeResponse(httpClient, path)) {
|
||||
try (Response response = executeResponse(httpClient, path, null)) {
|
||||
if (response.isSuccessful()) {
|
||||
return true;
|
||||
}
|
||||
@ -227,7 +244,7 @@ public class EsRestClient {
|
||||
return sslNetworkClient;
|
||||
}
|
||||
|
||||
private Response executeResponse(OkHttpClient httpClient, String path) throws IOException {
|
||||
private Response executeResponse(OkHttpClient httpClient, String path, RequestBody requestBody) throws IOException {
|
||||
currentNode = currentNode.trim();
|
||||
if (!(currentNode.startsWith("http://") || currentNode.startsWith("https://"))) {
|
||||
currentNode = "http://" + currentNode;
|
||||
@ -235,20 +252,29 @@ public class EsRestClient {
|
||||
if (!currentNode.endsWith("/")) {
|
||||
currentNode = currentNode + "/";
|
||||
}
|
||||
Request request = builder.get().url(currentNode + path).build();
|
||||
Request request;
|
||||
if (requestBody != null) {
|
||||
request = builder.post(requestBody).url(currentNode + path).build();
|
||||
} else {
|
||||
request = builder.get().url(currentNode + path).build();
|
||||
}
|
||||
if (LOG.isInfoEnabled()) {
|
||||
LOG.info("es rest client request URL: {}", request.url().toString());
|
||||
}
|
||||
return httpClient.newCall(request).execute();
|
||||
}
|
||||
|
||||
private String execute(String path) throws DorisEsException {
|
||||
return executeWithRequestBody(path, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* execute request for specific path,it will try again nodes.length times if it fails
|
||||
*
|
||||
* @param path the path must not leading with '/'
|
||||
* @return response
|
||||
*/
|
||||
private String execute(String path) throws DorisEsException {
|
||||
private String executeWithRequestBody(String path, RequestBody requestBody) throws DorisEsException {
|
||||
// try 3 times for every node
|
||||
int retrySize = nodes.length * 3;
|
||||
DorisEsException scratchExceptionForThrow = null;
|
||||
@ -268,7 +294,7 @@ public class EsRestClient {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("es rest client request URL: {}", currentNode + "/" + path);
|
||||
}
|
||||
try (Response response = executeResponse(httpClient, path)) {
|
||||
try (Response response = executeResponse(httpClient, path, requestBody)) {
|
||||
if (response.isSuccessful()) {
|
||||
return response.body().string();
|
||||
} else {
|
||||
|
||||
@ -0,0 +1,108 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.httpv2.restv2;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.util.JsonUtil;
|
||||
import org.apache.doris.datasource.CatalogIf;
|
||||
import org.apache.doris.datasource.es.EsExternalCatalog;
|
||||
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
|
||||
import org.apache.doris.httpv2.rest.RestBaseController;
|
||||
|
||||
import com.fasterxml.jackson.databind.node.ObjectNode;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMethod;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.function.BiFunction;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
@RestController
|
||||
@RequestMapping("/rest/v2/api/es_catalog")
|
||||
public class ESCatalogAction extends RestBaseController {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(ESCatalogAction.class);
|
||||
private static final String CATALOG = "catalog";
|
||||
private static final String TABLE = "table";
|
||||
|
||||
private Object handleRequest(HttpServletRequest request, HttpServletResponse response,
|
||||
BiFunction<EsExternalCatalog, String, String> action) {
|
||||
if (Config.enable_all_http_auth) {
|
||||
executeCheckPassword(request, response);
|
||||
}
|
||||
|
||||
try {
|
||||
if (!Env.getCurrentEnv().isMaster()) {
|
||||
return redirectToMasterOrException(request, response);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
|
||||
}
|
||||
|
||||
Map<String, Object> resultMap = Maps.newHashMap();
|
||||
Env env = Env.getCurrentEnv();
|
||||
String catalogName = request.getParameter(CATALOG);
|
||||
String tableName = request.getParameter(TABLE);
|
||||
CatalogIf catalog = env.getCatalogMgr().getCatalog(catalogName);
|
||||
if (!(catalog instanceof EsExternalCatalog)) {
|
||||
return ResponseEntityBuilder.badRequest("unknown ES Catalog: " + catalogName);
|
||||
}
|
||||
EsExternalCatalog esExternalCatalog = (EsExternalCatalog) catalog;
|
||||
esExternalCatalog.makeSureInitialized();
|
||||
String result = action.apply(esExternalCatalog, tableName);
|
||||
ObjectNode jsonResult = JsonUtil.parseObject(result);
|
||||
|
||||
resultMap.put("catalog", catalogName);
|
||||
resultMap.put("table", tableName);
|
||||
resultMap.put("result", jsonResult);
|
||||
|
||||
return ResponseEntityBuilder.ok(resultMap);
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/get_mapping", method = RequestMethod.GET)
|
||||
public Object getMapping(HttpServletRequest request, HttpServletResponse response) {
|
||||
return handleRequest(request, response, (esExternalCatalog, tableName) ->
|
||||
esExternalCatalog.getEsRestClient().getMapping(tableName));
|
||||
}
|
||||
|
||||
@RequestMapping(path = "/search", method = RequestMethod.POST)
|
||||
public Object search(HttpServletRequest request, HttpServletResponse response) {
|
||||
String body;
|
||||
try {
|
||||
body = getRequestBody(request);
|
||||
} catch (IOException e) {
|
||||
return ResponseEntityBuilder.okWithCommonError(e.getMessage());
|
||||
}
|
||||
return handleRequest(request, response, (esExternalCatalog, tableName) ->
|
||||
esExternalCatalog.getEsRestClient().searchIndex(tableName, body));
|
||||
}
|
||||
|
||||
private String getRequestBody(HttpServletRequest request) throws IOException {
|
||||
BufferedReader reader = request.getReader();
|
||||
return reader.lines().collect(Collectors.joining(System.lineSeparator()));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user