[fix](rest)check response code when get image (#22272)

This commit is contained in:
zhangdong
2023-08-06 10:34:10 +08:00
committed by GitHub
parent 59723a1883
commit fce78fff92
11 changed files with 206 additions and 89 deletions

View File

@ -140,7 +140,9 @@ import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.ha.HAProtocol;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.meta.MetaBaseAction;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.Timestamp;
@ -191,7 +193,6 @@ import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.SetReplicaStatusOperationLog;
import org.apache.doris.persist.Storage;
import org.apache.doris.persist.StorageInfo;
import org.apache.doris.persist.StorageInfoV2;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
@ -239,7 +240,6 @@ import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.PublishVersionDaemon;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
@ -256,11 +256,9 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.Collection;
@ -1637,7 +1635,7 @@ public class Env {
+ "/version";
File dir = new File(this.imageDir);
MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000,
MetaHelper.getOutputStream(Storage.VERSION_FILE, dir));
MetaHelper.getFile(Storage.VERSION_FILE, dir));
MetaHelper.complete(Storage.VERSION_FILE, dir);
return true;
} catch (Exception e) {
@ -1655,13 +1653,19 @@ public class Env {
try {
String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port);
String infoUrl = "http://" + hostPort + "/info";
StorageInfo info = getStorageInfo(infoUrl);
ResponseBody<StorageInfo> responseBody = MetaHelper
.doGet(infoUrl, HTTP_TIMEOUT_SECOND * 1000, StorageInfo.class);
if (responseBody.getCode() != RestApiStatusCode.OK.code) {
LOG.warn("get image failed,responseBody:{}", responseBody);
throw new IOException(responseBody.toString());
}
StorageInfo info = responseBody.getData();
long version = info.getImageSeq();
if (version > localImageVersion) {
String url = "http://" + hostPort + "/image?version=" + version;
String filename = Storage.IMAGE + "." + version;
File dir = new File(this.imageDir);
MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(filename, dir));
MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getFile(filename, dir));
MetaHelper.complete(filename, dir);
} else {
LOG.warn("get an image with a lower version, localImageVersion: {}, got version: {}",
@ -1693,42 +1697,6 @@ public class Env {
return containSelf;
}
private StorageInfo getStorageInfo(String url) throws IOException {
ObjectMapper mapper = new ObjectMapper();
HttpURLConnection connection = null;
try {
connection = HttpURLUtil.getConnectionWithNodeIdent(url);
connection.setConnectTimeout(HTTP_TIMEOUT_SECOND * 1000);
connection.setReadTimeout(HTTP_TIMEOUT_SECOND * 1000);
String response;
try (BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(connection.getInputStream()))) {
String line;
StringBuilder sb = new StringBuilder();
while ((line = bufferedReader.readLine()) != null) {
sb.append(line);
}
response = sb.toString();
}
// For http v2, the response body for "/info" api changed from
// StorageInfo to StorageInfoV2.
// So we need to make it compatible with old api.
try {
return mapper.readValue(response, StorageInfo.class);
} catch (Exception e) {
// try new response body
return mapper.readValue(response, StorageInfoV2.class).data;
}
} finally {
if (connection != null) {
connection.disconnect();
}
}
}
public StatisticsCache getStatisticsCache() {
return analysisManager.getStatisticsCache();
}

View File

@ -21,9 +21,12 @@ package org.apache.doris.common.util;
import org.apache.doris.catalog.Env;
import org.apache.doris.system.SystemInfoService.HostInfo;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
public class HttpURLUtil {
@ -38,4 +41,15 @@ public class HttpURLUtil {
conn.setRequestProperty(Env.CLIENT_NODE_PORT_KEY, selfNode.getPort() + "");
return conn;
}
public static Map<String, String> getNodeIdentHeaders() throws IOException {
Map<String, String> headers = Maps.newHashMap();
// Must use Env.getServingEnv() instead of getCurrentEnv(),
// because here we need to obtain selfNode through the official service catalog.
HostInfo selfNode = Env.getServingEnv().getSelfNode();
headers.put(Env.CLIENT_NODE_HOST_KEY, selfNode.getHost());
headers.put(Env.CLIENT_NODE_PORT_KEY, selfNode.getPort() + "");
return headers;
}
}

View File

@ -19,6 +19,8 @@ package org.apache.doris.httpv2.entity;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import java.util.Objects;
/**
* The response body of restful api.
* <p>
@ -96,4 +98,34 @@ public class ResponseBody<T> {
this.msg = msg;
return this;
}
@Override
public String toString() {
return "ResponseBody{"
+ "msg='" + msg + '\''
+ ", code=" + code
+ ", data=" + data
+ ", count=" + count
+ '}';
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ResponseBody<?> that = (ResponseBody<?>) o;
return code == that.code
&& count == that.count
&& Objects.equals(msg, that.msg)
&& Objects.equals(data, that.data);
}
@Override
public int hashCode() {
return Objects.hash(msg, code, data, count);
}
}

View File

@ -41,7 +41,6 @@ import org.springframework.web.bind.annotation.RestController;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@ -163,8 +162,7 @@ public class MetaService extends RestBaseController {
String filename = Storage.IMAGE + "." + versionStr;
File dir = new File(Env.getCurrentEnv().getImageDir());
try {
OutputStream out = MetaHelper.getOutputStream(filename, dir);
MetaHelper.getRemoteFile(url, TIMEOUT_SECOND * 1000, out);
MetaHelper.getRemoteFile(url, TIMEOUT_SECOND * 1000, MetaHelper.getFile(filename, dir));
MetaHelper.complete(filename, dir);
} catch (FileNotFoundException e) {
return ResponseEntityBuilder.notFound("file not found.");

View File

@ -24,12 +24,14 @@ import org.apache.doris.common.Config;
import org.apache.doris.httpv2.controller.BaseController;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TNetworkAddress;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.http.HttpStatus;
@ -175,7 +177,8 @@ public class RestBaseController extends BaseController {
Preconditions.checkArgument(imageFile != null && imageFile.exists());
response.setHeader("Content-type", "application/octet-stream");
response.addHeader("Content-Disposition", "attachment;fileName=" + imageFile.getName());
response.setHeader("X-Image-Size", imageFile.length() + "");
response.setHeader(MetaHelper.X_IMAGE_SIZE, imageFile.length() + "");
response.setHeader(MetaHelper.X_IMAGE_MD5, DigestUtils.md5Hex(new FileInputStream(imageFile)));
getFile(request, response, imageFile, imageFile.getName());
}

View File

@ -49,6 +49,7 @@ import javax.servlet.http.HttpServletRequest;
*/
public class HttpUtils {
static final int REQUEST_SUCCESS_CODE = 0;
static final int DEFAULT_TIME_OUT_MS = 2000;
static List<Pair<String, Integer>> getFeList() {
return Env.getCurrentEnv().getFrontends(null)
@ -74,12 +75,16 @@ public class HttpUtils {
return url.toString();
}
static String doGet(String url, Map<String, String> headers) throws IOException {
public static String doGet(String url, Map<String, String> headers, int timeoutMs) throws IOException {
HttpGet httpGet = new HttpGet(url);
setRequestConfig(httpGet, headers);
setRequestConfig(httpGet, headers, timeoutMs);
return executeRequest(httpGet);
}
public static String doGet(String url, Map<String, String> headers) throws IOException {
return doGet(url, headers, DEFAULT_TIME_OUT_MS);
}
static String doPost(String url, Map<String, String> headers, Object body) throws IOException {
HttpPost httpPost = new HttpPost(url);
if (Objects.nonNull(body)) {
@ -88,11 +93,11 @@ public class HttpUtils {
httpPost.setEntity(stringEntity);
}
setRequestConfig(httpPost, headers);
setRequestConfig(httpPost, headers, DEFAULT_TIME_OUT_MS);
return executeRequest(httpPost);
}
private static void setRequestConfig(HttpRequestBase request, Map<String, String> headers) {
private static void setRequestConfig(HttpRequestBase request, Map<String, String> headers, int timeoutMs) {
if (null != headers) {
for (String key : headers.keySet()) {
request.setHeader(key, headers.get(key));
@ -100,9 +105,9 @@ public class HttpUtils {
}
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(2000)
.setConnectionRequestTimeout(2000)
.setSocketTimeout(2000)
.setConnectTimeout(timeoutMs)
.setConnectionRequestTimeout(timeoutMs)
.setSocketTimeout(timeoutMs)
.build();
request.setConfig(config);
}

View File

@ -24,6 +24,8 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.HttpURLUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.monitor.jvm.JvmService;
import org.apache.doris.monitor.jvm.JvmStats;
@ -201,8 +203,12 @@ public class Checkpoint extends MasterDaemon {
LOG.info("Put image:{}", url);
try {
MetaHelper.getRemoteFile(url, PUT_TIMEOUT_SECOND * 1000, new NullOutputStream());
successPushed++;
ResponseBody responseBody = MetaHelper.doGet(url, PUT_TIMEOUT_SECOND * 1000, Object.class);
if (responseBody.getCode() == RestApiStatusCode.OK.code) {
successPushed++;
} else {
LOG.warn("Failed when pushing image file. url = {},responseBody = {}", url, responseBody);
}
} catch (IOException e) {
LOG.error("Exception when pushing image file. url = {}", url, e);
}

View File

@ -20,18 +20,27 @@ package org.apache.doris.master;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.io.IOUtils;
import org.apache.doris.common.util.HttpURLUtil;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.manager.HttpUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.commons.codec.digest.DigestUtils;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
public class MetaHelper {
private static final String PART_SUFFIX = ".part";
public static final String X_IMAGE_SIZE = "X-Image-Size";
public static final String X_IMAGE_MD5 = "X-Image-Md5";
private static final int BUFFER_BYTES = 8 * 1024;
private static final int CHECKPOINT_LIMIT_BYTES = 30 * 1024 * 1024;
@ -60,11 +69,20 @@ public class MetaHelper {
return new FileOutputStream(file);
}
public static File getFile(String filename, File dir) {
return new File(dir, filename + MetaHelper.PART_SUFFIX);
}
public static <T> ResponseBody doGet(String url, int timeout, Class<T> clazz) throws IOException {
String response = HttpUtils.doGet(url, HttpURLUtil.getNodeIdentHeaders(), timeout);
return parseResponse(response, clazz);
}
// download file from remote node
public static void getRemoteFile(String urlStr, int timeout, OutputStream out)
public static void getRemoteFile(String urlStr, int timeout, File file)
throws IOException {
HttpURLConnection conn = null;
OutputStream out = new FileOutputStream(file);
try {
conn = HttpURLUtil.getConnectionWithNodeIdent(urlStr);
conn.setConnectTimeout(timeout);
@ -76,7 +94,10 @@ public class MetaHelper {
if (imageSizeStr != null) {
imageSize = Long.parseLong(imageSizeStr);
}
if (imageSize < 0) {
throw new IOException(getResponse(conn));
}
String remoteMd5 = conn.getHeaderField(X_IMAGE_MD5);
BufferedInputStream bin = new BufferedInputStream(conn.getInputStream());
// Do not limit speed in client side.
@ -85,6 +106,14 @@ public class MetaHelper {
if ((imageSize > 0) && (bytes != imageSize)) {
throw new IOException("Unexpected image size, expected: " + imageSize + ", actual: " + bytes);
}
// if remoteMd5 not null ,we need check md5
if (remoteMd5 != null) {
String localMd5 = DigestUtils.md5Hex(new FileInputStream(file));
if (!remoteMd5.equals(localMd5)) {
throw new IOException("Unexpected image md5, expected: " + remoteMd5 + ", actual: " + localMd5);
}
}
} finally {
if (conn != null) {
conn.disconnect();
@ -95,4 +124,23 @@ public class MetaHelper {
}
}
public static String getResponse(HttpURLConnection conn) throws IOException {
String response;
try (BufferedReader bufferedReader = new BufferedReader(
new InputStreamReader(conn.getInputStream()))) {
String line;
StringBuilder sb = new StringBuilder();
while ((line = bufferedReader.readLine()) != null) {
sb.append(line);
}
response = sb.toString();
}
return response;
}
public static <T> ResponseBody parseResponse(String response, Class<T> clazz) {
return GsonUtils.GSON.fromJson(response,
com.google.gson.internal.$Gson$Types.newParameterizedTypeWithOwner(null, ResponseBody.class, clazz));
}
}

View File

@ -17,6 +17,8 @@
package org.apache.doris.persist;
import java.util.Objects;
/**
* This class is designed for sending storage information from master to standby master.
* StorageInfo is easier to serialize to a Json String than class Storage
@ -59,4 +61,23 @@ public class StorageInfo {
public void setImageSeq(long imageSeq) {
this.imageSeq = imageSeq;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
StorageInfo that = (StorageInfo) o;
return clusterID == that.clusterID
&& imageSeq == that.imageSeq
&& editsSeq == that.editsSeq;
}
@Override
public int hashCode() {
return Objects.hash(clusterID, imageSeq, editsSeq);
}
}

View File

@ -1,30 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.persist;
/**
* This class is wrapper of StorageInfo.
* Because for http v2, the response body of "/info" api changed to:
* {"msg":"success","code":0,"data":{"clusterID":1464655034,"imageSeq":809779,"editsSeq":0},"count":0}
*/
public class StorageInfoV2 {
public String msg;
public int code;
public StorageInfo data;
public int count;
}

View File

@ -0,0 +1,52 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.master;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.persist.StorageInfo;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Assert;
import org.junit.Test;
public class MetaHelperTest {
@Test
public void test() throws JsonProcessingException {
ResponseBody<StorageInfo> bodyBefore = buildResponseBody();
ObjectMapper mapper = new ObjectMapper();
String bodyStr = mapper.writeValueAsString(bodyBefore);
ResponseBody<StorageInfo> bodyAfter = MetaHelper.parseResponse(bodyStr, StorageInfo.class);
Assert.assertEquals(bodyAfter, bodyBefore);
}
private ResponseBody<StorageInfo> buildResponseBody() {
StorageInfo infoBefore = new StorageInfo();
infoBefore.setClusterID(1);
infoBefore.setEditsSeq(2L);
infoBefore.setImageSeq(3L);
ResponseBody<StorageInfo> bodyBefore = new ResponseBody<>();
bodyBefore.setCode(RestApiStatusCode.UNAUTHORIZED.code);
bodyBefore.setCount(5);
bodyBefore.setData(infoBefore);
bodyBefore.setMsg("msg");
return bodyBefore;
}
}