diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 9a286ad98c..38abe34337 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -140,7 +140,9 @@ import org.apache.doris.ha.BDBHA; import org.apache.doris.ha.FrontendNodeType; import org.apache.doris.ha.HAProtocol; import org.apache.doris.ha.MasterInfo; +import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.meta.MetaBaseAction; +import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.journal.JournalCursor; import org.apache.doris.journal.JournalEntity; import org.apache.doris.journal.bdbje.Timestamp; @@ -191,7 +193,6 @@ import org.apache.doris.persist.ReplicaPersistInfo; import org.apache.doris.persist.SetReplicaStatusOperationLog; import org.apache.doris.persist.Storage; import org.apache.doris.persist.StorageInfo; -import org.apache.doris.persist.StorageInfoV2; import org.apache.doris.persist.TableInfo; import org.apache.doris.persist.TablePropertyInfo; import org.apache.doris.persist.TableRenameColumnInfo; @@ -239,7 +240,6 @@ import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector; import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.PublishVersionDaemon; -import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @@ -256,11 +256,9 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.BufferedReader; import java.io.DataInputStream; import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.Collection; @@ -1637,7 +1635,7 @@ public class Env { + "/version"; File dir = new File(this.imageDir); MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, - MetaHelper.getOutputStream(Storage.VERSION_FILE, dir)); + MetaHelper.getFile(Storage.VERSION_FILE, dir)); MetaHelper.complete(Storage.VERSION_FILE, dir); return true; } catch (Exception e) { @@ -1655,13 +1653,19 @@ public class Env { try { String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port); String infoUrl = "http://" + hostPort + "/info"; - StorageInfo info = getStorageInfo(infoUrl); + ResponseBody responseBody = MetaHelper + .doGet(infoUrl, HTTP_TIMEOUT_SECOND * 1000, StorageInfo.class); + if (responseBody.getCode() != RestApiStatusCode.OK.code) { + LOG.warn("get image failed,responseBody:{}", responseBody); + throw new IOException(responseBody.toString()); + } + StorageInfo info = responseBody.getData(); long version = info.getImageSeq(); if (version > localImageVersion) { String url = "http://" + hostPort + "/image?version=" + version; String filename = Storage.IMAGE + "." + version; File dir = new File(this.imageDir); - MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getOutputStream(filename, dir)); + MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000, MetaHelper.getFile(filename, dir)); MetaHelper.complete(filename, dir); } else { LOG.warn("get an image with a lower version, localImageVersion: {}, got version: {}", @@ -1693,42 +1697,6 @@ public class Env { return containSelf; } - private StorageInfo getStorageInfo(String url) throws IOException { - ObjectMapper mapper = new ObjectMapper(); - - HttpURLConnection connection = null; - try { - connection = HttpURLUtil.getConnectionWithNodeIdent(url); - connection.setConnectTimeout(HTTP_TIMEOUT_SECOND * 1000); - connection.setReadTimeout(HTTP_TIMEOUT_SECOND * 1000); - - String response; - try (BufferedReader bufferedReader = new BufferedReader( - new InputStreamReader(connection.getInputStream()))) { - String line; - StringBuilder sb = new StringBuilder(); - while ((line = bufferedReader.readLine()) != null) { - sb.append(line); - } - response = sb.toString(); - } - - // For http v2, the response body for "/info" api changed from - // StorageInfo to StorageInfoV2. - // So we need to make it compatible with old api. - try { - return mapper.readValue(response, StorageInfo.class); - } catch (Exception e) { - // try new response body - return mapper.readValue(response, StorageInfoV2.class).data; - } - } finally { - if (connection != null) { - connection.disconnect(); - } - } - } - public StatisticsCache getStatisticsCache() { return analysisManager.getStatisticsCache(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/HttpURLUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/HttpURLUtil.java index 966bc6c66e..23026432f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/HttpURLUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/HttpURLUtil.java @@ -21,9 +21,12 @@ package org.apache.doris.common.util; import org.apache.doris.catalog.Env; import org.apache.doris.system.SystemInfoService.HostInfo; +import com.google.common.collect.Maps; + import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; +import java.util.Map; public class HttpURLUtil { @@ -38,4 +41,15 @@ public class HttpURLUtil { conn.setRequestProperty(Env.CLIENT_NODE_PORT_KEY, selfNode.getPort() + ""); return conn; } + + public static Map getNodeIdentHeaders() throws IOException { + Map headers = Maps.newHashMap(); + // Must use Env.getServingEnv() instead of getCurrentEnv(), + // because here we need to obtain selfNode through the official service catalog. + HostInfo selfNode = Env.getServingEnv().getSelfNode(); + headers.put(Env.CLIENT_NODE_HOST_KEY, selfNode.getHost()); + headers.put(Env.CLIENT_NODE_PORT_KEY, selfNode.getPort() + ""); + return headers; + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseBody.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseBody.java index 9738e7e4ae..489d93a32d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseBody.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/entity/ResponseBody.java @@ -19,6 +19,8 @@ package org.apache.doris.httpv2.entity; import org.apache.doris.httpv2.rest.RestApiStatusCode; +import java.util.Objects; + /** * The response body of restful api. *

@@ -96,4 +98,34 @@ public class ResponseBody { this.msg = msg; return this; } + + @Override + public String toString() { + return "ResponseBody{" + + "msg='" + msg + '\'' + + ", code=" + code + + ", data=" + data + + ", count=" + count + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ResponseBody that = (ResponseBody) o; + return code == that.code + && count == that.count + && Objects.equals(msg, that.msg) + && Objects.equals(data, that.data); + } + + @Override + public int hashCode() { + return Objects.hash(msg, code, data, count); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java index cc0cb47a63..ce71581f5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/meta/MetaService.java @@ -41,7 +41,6 @@ import org.springframework.web.bind.annotation.RestController; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.OutputStream; import java.util.Map; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -163,8 +162,7 @@ public class MetaService extends RestBaseController { String filename = Storage.IMAGE + "." + versionStr; File dir = new File(Env.getCurrentEnv().getImageDir()); try { - OutputStream out = MetaHelper.getOutputStream(filename, dir); - MetaHelper.getRemoteFile(url, TIMEOUT_SECOND * 1000, out); + MetaHelper.getRemoteFile(url, TIMEOUT_SECOND * 1000, MetaHelper.getFile(filename, dir)); MetaHelper.complete(filename, dir); } catch (FileNotFoundException e) { return ResponseEntityBuilder.notFound("file not found."); diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java index 17f7187e24..ef660a5e28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/RestBaseController.java @@ -24,12 +24,14 @@ import org.apache.doris.common.Config; import org.apache.doris.httpv2.controller.BaseController; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.exception.UnauthorizedException; +import org.apache.doris.master.MetaHelper; import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.SystemInfoService; import org.apache.doris.thrift.TNetworkAddress; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import org.apache.commons.codec.digest.DigestUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.springframework.http.HttpStatus; @@ -175,7 +177,8 @@ public class RestBaseController extends BaseController { Preconditions.checkArgument(imageFile != null && imageFile.exists()); response.setHeader("Content-type", "application/octet-stream"); response.addHeader("Content-Disposition", "attachment;fileName=" + imageFile.getName()); - response.setHeader("X-Image-Size", imageFile.length() + ""); + response.setHeader(MetaHelper.X_IMAGE_SIZE, imageFile.length() + ""); + response.setHeader(MetaHelper.X_IMAGE_MD5, DigestUtils.md5Hex(new FileInputStream(imageFile))); getFile(request, response, imageFile, imageFile.getName()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java index 3a065ee0ec..8caab8df2d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/manager/HttpUtils.java @@ -49,6 +49,7 @@ import javax.servlet.http.HttpServletRequest; */ public class HttpUtils { static final int REQUEST_SUCCESS_CODE = 0; + static final int DEFAULT_TIME_OUT_MS = 2000; static List> getFeList() { return Env.getCurrentEnv().getFrontends(null) @@ -74,12 +75,16 @@ public class HttpUtils { return url.toString(); } - static String doGet(String url, Map headers) throws IOException { + public static String doGet(String url, Map headers, int timeoutMs) throws IOException { HttpGet httpGet = new HttpGet(url); - setRequestConfig(httpGet, headers); + setRequestConfig(httpGet, headers, timeoutMs); return executeRequest(httpGet); } + public static String doGet(String url, Map headers) throws IOException { + return doGet(url, headers, DEFAULT_TIME_OUT_MS); + } + static String doPost(String url, Map headers, Object body) throws IOException { HttpPost httpPost = new HttpPost(url); if (Objects.nonNull(body)) { @@ -88,11 +93,11 @@ public class HttpUtils { httpPost.setEntity(stringEntity); } - setRequestConfig(httpPost, headers); + setRequestConfig(httpPost, headers, DEFAULT_TIME_OUT_MS); return executeRequest(httpPost); } - private static void setRequestConfig(HttpRequestBase request, Map headers) { + private static void setRequestConfig(HttpRequestBase request, Map headers, int timeoutMs) { if (null != headers) { for (String key : headers.keySet()) { request.setHeader(key, headers.get(key)); @@ -100,9 +105,9 @@ public class HttpUtils { } RequestConfig config = RequestConfig.custom() - .setConnectTimeout(2000) - .setConnectionRequestTimeout(2000) - .setSocketTimeout(2000) + .setConnectTimeout(timeoutMs) + .setConnectionRequestTimeout(timeoutMs) + .setSocketTimeout(timeoutMs) .build(); request.setConfig(config); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java index 94e108f69d..6aa640f840 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/Checkpoint.java @@ -24,6 +24,8 @@ import org.apache.doris.common.FeConstants; import org.apache.doris.common.util.HttpURLUtil; import org.apache.doris.common.util.MasterDaemon; import org.apache.doris.common.util.NetUtils; +import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.metric.MetricRepo; import org.apache.doris.monitor.jvm.JvmService; import org.apache.doris.monitor.jvm.JvmStats; @@ -201,8 +203,12 @@ public class Checkpoint extends MasterDaemon { LOG.info("Put image:{}", url); try { - MetaHelper.getRemoteFile(url, PUT_TIMEOUT_SECOND * 1000, new NullOutputStream()); - successPushed++; + ResponseBody responseBody = MetaHelper.doGet(url, PUT_TIMEOUT_SECOND * 1000, Object.class); + if (responseBody.getCode() == RestApiStatusCode.OK.code) { + successPushed++; + } else { + LOG.warn("Failed when pushing image file. url = {},responseBody = {}", url, responseBody); + } } catch (IOException e) { LOG.error("Exception when pushing image file. url = {}", url, e); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/master/MetaHelper.java b/fe/fe-core/src/main/java/org/apache/doris/master/MetaHelper.java index def86db16f..e1de241058 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/master/MetaHelper.java +++ b/fe/fe-core/src/main/java/org/apache/doris/master/MetaHelper.java @@ -20,18 +20,27 @@ package org.apache.doris.master; import org.apache.doris.catalog.Env; import org.apache.doris.common.io.IOUtils; import org.apache.doris.common.util.HttpURLUtil; +import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.manager.HttpUtils; +import org.apache.doris.persist.gson.GsonUtils; + +import org.apache.commons.codec.digest.DigestUtils; import java.io.BufferedInputStream; +import java.io.BufferedReader; import java.io.File; +import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStreamReader; import java.io.OutputStream; import java.net.HttpURLConnection; public class MetaHelper { private static final String PART_SUFFIX = ".part"; public static final String X_IMAGE_SIZE = "X-Image-Size"; + public static final String X_IMAGE_MD5 = "X-Image-Md5"; private static final int BUFFER_BYTES = 8 * 1024; private static final int CHECKPOINT_LIMIT_BYTES = 30 * 1024 * 1024; @@ -60,11 +69,20 @@ public class MetaHelper { return new FileOutputStream(file); } + public static File getFile(String filename, File dir) { + return new File(dir, filename + MetaHelper.PART_SUFFIX); + } + + public static ResponseBody doGet(String url, int timeout, Class clazz) throws IOException { + String response = HttpUtils.doGet(url, HttpURLUtil.getNodeIdentHeaders(), timeout); + return parseResponse(response, clazz); + } + // download file from remote node - public static void getRemoteFile(String urlStr, int timeout, OutputStream out) + public static void getRemoteFile(String urlStr, int timeout, File file) throws IOException { HttpURLConnection conn = null; - + OutputStream out = new FileOutputStream(file); try { conn = HttpURLUtil.getConnectionWithNodeIdent(urlStr); conn.setConnectTimeout(timeout); @@ -76,7 +94,10 @@ public class MetaHelper { if (imageSizeStr != null) { imageSize = Long.parseLong(imageSizeStr); } - + if (imageSize < 0) { + throw new IOException(getResponse(conn)); + } + String remoteMd5 = conn.getHeaderField(X_IMAGE_MD5); BufferedInputStream bin = new BufferedInputStream(conn.getInputStream()); // Do not limit speed in client side. @@ -85,6 +106,14 @@ public class MetaHelper { if ((imageSize > 0) && (bytes != imageSize)) { throw new IOException("Unexpected image size, expected: " + imageSize + ", actual: " + bytes); } + + // if remoteMd5 not null ,we need check md5 + if (remoteMd5 != null) { + String localMd5 = DigestUtils.md5Hex(new FileInputStream(file)); + if (!remoteMd5.equals(localMd5)) { + throw new IOException("Unexpected image md5, expected: " + remoteMd5 + ", actual: " + localMd5); + } + } } finally { if (conn != null) { conn.disconnect(); @@ -95,4 +124,23 @@ public class MetaHelper { } } + public static String getResponse(HttpURLConnection conn) throws IOException { + String response; + try (BufferedReader bufferedReader = new BufferedReader( + new InputStreamReader(conn.getInputStream()))) { + String line; + StringBuilder sb = new StringBuilder(); + while ((line = bufferedReader.readLine()) != null) { + sb.append(line); + } + response = sb.toString(); + } + return response; + } + + public static ResponseBody parseResponse(String response, Class clazz) { + return GsonUtils.GSON.fromJson(response, + com.google.gson.internal.$Gson$Types.newParameterizedTypeWithOwner(null, ResponseBody.class, clazz)); + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfo.java b/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfo.java index 114c31695d..7b2d35d761 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfo.java @@ -17,6 +17,8 @@ package org.apache.doris.persist; +import java.util.Objects; + /** * This class is designed for sending storage information from master to standby master. * StorageInfo is easier to serialize to a Json String than class Storage @@ -59,4 +61,23 @@ public class StorageInfo { public void setImageSeq(long imageSeq) { this.imageSeq = imageSeq; } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + StorageInfo that = (StorageInfo) o; + return clusterID == that.clusterID + && imageSeq == that.imageSeq + && editsSeq == that.editsSeq; + } + + @Override + public int hashCode() { + return Objects.hash(clusterID, imageSeq, editsSeq); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfoV2.java b/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfoV2.java deleted file mode 100644 index 8f1bba717c..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/StorageInfoV2.java +++ /dev/null @@ -1,30 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.persist; - -/** - * This class is wrapper of StorageInfo. - * Because for http v2, the response body of "/info" api changed to: - * {"msg":"success","code":0,"data":{"clusterID":1464655034,"imageSeq":809779,"editsSeq":0},"count":0} - */ -public class StorageInfoV2 { - public String msg; - public int code; - public StorageInfo data; - public int count; -} diff --git a/fe/fe-core/src/test/java/org/apache/doris/master/MetaHelperTest.java b/fe/fe-core/src/test/java/org/apache/doris/master/MetaHelperTest.java new file mode 100644 index 0000000000..070979494b --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/master/MetaHelperTest.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.master; + +import org.apache.doris.httpv2.entity.ResponseBody; +import org.apache.doris.httpv2.rest.RestApiStatusCode; +import org.apache.doris.persist.StorageInfo; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Assert; +import org.junit.Test; + +public class MetaHelperTest { + + @Test + public void test() throws JsonProcessingException { + ResponseBody bodyBefore = buildResponseBody(); + ObjectMapper mapper = new ObjectMapper(); + String bodyStr = mapper.writeValueAsString(bodyBefore); + ResponseBody bodyAfter = MetaHelper.parseResponse(bodyStr, StorageInfo.class); + Assert.assertEquals(bodyAfter, bodyBefore); + } + + private ResponseBody buildResponseBody() { + StorageInfo infoBefore = new StorageInfo(); + infoBefore.setClusterID(1); + infoBefore.setEditsSeq(2L); + infoBefore.setImageSeq(3L); + ResponseBody bodyBefore = new ResponseBody<>(); + bodyBefore.setCode(RestApiStatusCode.UNAUTHORIZED.code); + bodyBefore.setCount(5); + bodyBefore.setData(infoBefore); + bodyBefore.setMsg("msg"); + return bodyBefore; + } +}