From 841f9cd07b3a85e77c8d29b756900d66cb58f368 Mon Sep 17 00:00:00 2001 From: xy720 <22125576+xy720@users.noreply.github.com> Date: Tue, 28 Jul 2020 16:24:07 +0800 Subject: [PATCH] [Bug][SparkLoad] Divide the upload in spark repository into two steps (#4195) When Fe uploads the spark archive, the broker may fail to write the file, resulting in the bad file being uploaded to the repository. Therefore, in order to prevent spark from reading bad files, we need to divide the upload into two steps. The first step is to upload the file, and the second step is to rename the file with MD5 value. --- .../apache/doris/common/util/BrokerUtil.java | 25 +++++++++++++++ .../doris/load/loadv2/SparkRepository.java | 31 ++++++++++++++++--- .../load/loadv2/SparkRepositoryTest.java | 10 ++++-- 3 files changed, 60 insertions(+), 6 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 75c82f4096..9e62c1ccb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPReadRequest; import org.apache.doris.thrift.TBrokerPWriteRequest; import org.apache.doris.thrift.TBrokerReadResponse; +import org.apache.doris.thrift.TBrokerRenamePathRequest; import org.apache.doris.thrift.TBrokerVersion; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; @@ -374,6 +375,30 @@ public class BrokerUtil { } } + public static void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) throws UserException { + Pair pair = getBrokerAddressAndClient(brokerDesc); + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + boolean failed = true; + try { + TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, origFilePath, + destFilePath, brokerDesc.getProperties()); + TBrokerOperationStatus rep = client.renamePath(req); + if (rep.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to rename " + origFilePath + " to " + destFilePath + + ", msg: " + rep.getMessage() + ", broker: " + address); + } + failed = false; + } catch (TException e) { + LOG.warn("Broker rename file failed, origin path={}, dest path={}, address={}, exception={}", + origFilePath, destFilePath, address, e); + throw new UserException("Broker rename file exception. origin path=" + origFilePath + + ", dest path=" + destFilePath + ", broker=" + address); + } finally { + returnClient(client, address, failed); + } + } + public static Pair getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException { Pair pair = new Pair(null, null); TNetworkAddress address = getAddress(brokerDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 1062092b8f..9073c151b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -172,24 +172,34 @@ public class SparkRepository { String srcFilePath = null; // upload dpp { + // 1. upload dpp srcFilePath = localDppPath; + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String origFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, "", fileName, ""); + upload(srcFilePath, origFilePath); + // 2. rename dpp String md5sum = getMd5String(srcFilePath); long size = getFileSize(srcFilePath); - String fileName = getFileName(PATH_DELIMITER, srcFilePath); String destFilePath = remoteArchivePath + PATH_DELIMITER + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); - upload(srcFilePath, destFilePath); + rename(origFilePath, destFilePath); currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.DPP, size)); } // upload spark2x { + // 1. upload spark2x srcFilePath = localSpark2xPath; + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String origFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, "", fileName, ""); + upload(srcFilePath, origFilePath); + // 2. rename spark2x String md5sum = getMd5String(srcFilePath); long size = getFileSize(srcFilePath); - String fileName = getFileName(PATH_DELIMITER, srcFilePath); String destFilePath = remoteArchivePath + PATH_DELIMITER + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); - upload(srcFilePath, destFilePath); + rename(origFilePath, destFilePath); currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.SPARK2X, size)); } LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}", @@ -217,6 +227,9 @@ public class SparkRepository { continue; } String md5sum = lib_arg[0]; + if (Strings.isNullOrEmpty(md5sum)) { + continue; + } String type = lib_arg[1]; SparkLibrary.LibType libType = null; switch (type) { @@ -271,6 +284,16 @@ public class SparkRepository { } } + private void rename(String origFilePath, String destFilePath) throws LoadException { + try { + BrokerUtil.rename(origFilePath, destFilePath, brokerDesc); + LOG.info("finished to rename file, originPath={}, destPath={}", origFilePath, destFilePath); + } catch (UserException e) { + throw new LoadException("failed to rename file from " + origFilePath + " to " + destFilePath + + ", message=" + e.getMessage()); + } + } + public SparkArchive getCurrentArchive() { return currentArchive; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java index 70e5cf9e51..2bba7c745b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java @@ -119,7 +119,10 @@ public class SparkRepositoryTest { throws UserException { return false; } @Mock void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) - throws UserException { return;} + throws UserException { return; } + @Mock + void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return; } }; BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap()); @@ -179,7 +182,10 @@ public class SparkRepositoryTest { throws UserException { return; } @Mock void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) - throws UserException { return;} + throws UserException { return; } + @Mock + void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return; } }; // new md5dum of local library