diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java index 75c82f4096..9e62c1ccb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java @@ -45,6 +45,7 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode; import org.apache.doris.thrift.TBrokerPReadRequest; import org.apache.doris.thrift.TBrokerPWriteRequest; import org.apache.doris.thrift.TBrokerReadResponse; +import org.apache.doris.thrift.TBrokerRenamePathRequest; import org.apache.doris.thrift.TBrokerVersion; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPaloBrokerService; @@ -374,6 +375,30 @@ public class BrokerUtil { } } + public static void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) throws UserException { + Pair pair = getBrokerAddressAndClient(brokerDesc); + TPaloBrokerService.Client client = pair.first; + TNetworkAddress address = pair.second; + boolean failed = true; + try { + TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, origFilePath, + destFilePath, brokerDesc.getProperties()); + TBrokerOperationStatus rep = client.renamePath(req); + if (rep.getStatusCode() != TBrokerOperationStatusCode.OK) { + throw new UserException("failed to rename " + origFilePath + " to " + destFilePath + + ", msg: " + rep.getMessage() + ", broker: " + address); + } + failed = false; + } catch (TException e) { + LOG.warn("Broker rename file failed, origin path={}, dest path={}, address={}, exception={}", + origFilePath, destFilePath, address, e); + throw new UserException("Broker rename file exception. origin path=" + origFilePath + + ", dest path=" + destFilePath + ", broker=" + address); + } finally { + returnClient(client, address, failed); + } + } + public static Pair getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException { Pair pair = new Pair(null, null); TNetworkAddress address = getAddress(brokerDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java index 1062092b8f..9073c151b3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkRepository.java @@ -172,24 +172,34 @@ public class SparkRepository { String srcFilePath = null; // upload dpp { + // 1. upload dpp srcFilePath = localDppPath; + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String origFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, "", fileName, ""); + upload(srcFilePath, origFilePath); + // 2. rename dpp String md5sum = getMd5String(srcFilePath); long size = getFileSize(srcFilePath); - String fileName = getFileName(PATH_DELIMITER, srcFilePath); String destFilePath = remoteArchivePath + PATH_DELIMITER + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); - upload(srcFilePath, destFilePath); + rename(origFilePath, destFilePath); currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.DPP, size)); } // upload spark2x { + // 1. upload spark2x srcFilePath = localSpark2xPath; + String fileName = getFileName(PATH_DELIMITER, srcFilePath); + String origFilePath = remoteArchivePath + PATH_DELIMITER + + assemblyFileName(PREFIX_LIB, "", fileName, ""); + upload(srcFilePath, origFilePath); + // 2. rename spark2x String md5sum = getMd5String(srcFilePath); long size = getFileSize(srcFilePath); - String fileName = getFileName(PATH_DELIMITER, srcFilePath); String destFilePath = remoteArchivePath + PATH_DELIMITER + assemblyFileName(PREFIX_LIB, md5sum, fileName, ""); - upload(srcFilePath, destFilePath); + rename(origFilePath, destFilePath); currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.SPARK2X, size)); } LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}", @@ -217,6 +227,9 @@ public class SparkRepository { continue; } String md5sum = lib_arg[0]; + if (Strings.isNullOrEmpty(md5sum)) { + continue; + } String type = lib_arg[1]; SparkLibrary.LibType libType = null; switch (type) { @@ -271,6 +284,16 @@ public class SparkRepository { } } + private void rename(String origFilePath, String destFilePath) throws LoadException { + try { + BrokerUtil.rename(origFilePath, destFilePath, brokerDesc); + LOG.info("finished to rename file, originPath={}, destPath={}", origFilePath, destFilePath); + } catch (UserException e) { + throw new LoadException("failed to rename file from " + origFilePath + " to " + destFilePath + + ", message=" + e.getMessage()); + } + } + public SparkArchive getCurrentArchive() { return currentArchive; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java index 70e5cf9e51..2bba7c745b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkRepositoryTest.java @@ -119,7 +119,10 @@ public class SparkRepositoryTest { throws UserException { return false; } @Mock void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) - throws UserException { return;} + throws UserException { return; } + @Mock + void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return; } }; BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap()); @@ -179,7 +182,10 @@ public class SparkRepositoryTest { throws UserException { return; } @Mock void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc) - throws UserException { return;} + throws UserException { return; } + @Mock + void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) + throws UserException { return; } }; // new md5dum of local library