[Bug][SparkLoad] Divide the upload in spark repository into two steps (#4195)

When Fe uploads the spark archive, the broker may fail to write the file,
resulting in the bad file being uploaded to the repository.

Therefore, in order to prevent spark from reading bad files, we need to
divide the upload into two steps.
The first step is to upload the file, and the second step is to rename the file with MD5 value.
This commit is contained in:
xy720
2020-07-28 16:24:07 +08:00
committed by GitHub
parent 150f8e0e2b
commit 841f9cd07b
3 changed files with 60 additions and 6 deletions

View File

@ -45,6 +45,7 @@ import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPReadRequest;
import org.apache.doris.thrift.TBrokerPWriteRequest;
import org.apache.doris.thrift.TBrokerReadResponse;
import org.apache.doris.thrift.TBrokerRenamePathRequest;
import org.apache.doris.thrift.TBrokerVersion;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
@ -374,6 +375,30 @@ public class BrokerUtil {
}
}
public static void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc) throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = getBrokerAddressAndClient(brokerDesc);
TPaloBrokerService.Client client = pair.first;
TNetworkAddress address = pair.second;
boolean failed = true;
try {
TBrokerRenamePathRequest req = new TBrokerRenamePathRequest(TBrokerVersion.VERSION_ONE, origFilePath,
destFilePath, brokerDesc.getProperties());
TBrokerOperationStatus rep = client.renamePath(req);
if (rep.getStatusCode() != TBrokerOperationStatusCode.OK) {
throw new UserException("failed to rename " + origFilePath + " to " + destFilePath +
", msg: " + rep.getMessage() + ", broker: " + address);
}
failed = false;
} catch (TException e) {
LOG.warn("Broker rename file failed, origin path={}, dest path={}, address={}, exception={}",
origFilePath, destFilePath, address, e);
throw new UserException("Broker rename file exception. origin path=" + origFilePath +
", dest path=" + destFilePath + ", broker=" + address);
} finally {
returnClient(client, address, failed);
}
}
public static Pair<TPaloBrokerService.Client, TNetworkAddress> getBrokerAddressAndClient(BrokerDesc brokerDesc) throws UserException {
Pair<TPaloBrokerService.Client, TNetworkAddress> pair = new Pair<TPaloBrokerService.Client, TNetworkAddress>(null, null);
TNetworkAddress address = getAddress(brokerDesc);

View File

@ -172,24 +172,34 @@ public class SparkRepository {
String srcFilePath = null;
// upload dpp
{
// 1. upload dpp
srcFilePath = localDppPath;
String fileName = getFileName(PATH_DELIMITER, srcFilePath);
String origFilePath = remoteArchivePath + PATH_DELIMITER +
assemblyFileName(PREFIX_LIB, "", fileName, "");
upload(srcFilePath, origFilePath);
// 2. rename dpp
String md5sum = getMd5String(srcFilePath);
long size = getFileSize(srcFilePath);
String fileName = getFileName(PATH_DELIMITER, srcFilePath);
String destFilePath = remoteArchivePath + PATH_DELIMITER +
assemblyFileName(PREFIX_LIB, md5sum, fileName, "");
upload(srcFilePath, destFilePath);
rename(origFilePath, destFilePath);
currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.DPP, size));
}
// upload spark2x
{
// 1. upload spark2x
srcFilePath = localSpark2xPath;
String fileName = getFileName(PATH_DELIMITER, srcFilePath);
String origFilePath = remoteArchivePath + PATH_DELIMITER +
assemblyFileName(PREFIX_LIB, "", fileName, "");
upload(srcFilePath, origFilePath);
// 2. rename spark2x
String md5sum = getMd5String(srcFilePath);
long size = getFileSize(srcFilePath);
String fileName = getFileName(PATH_DELIMITER, srcFilePath);
String destFilePath = remoteArchivePath + PATH_DELIMITER +
assemblyFileName(PREFIX_LIB, md5sum, fileName, "");
upload(srcFilePath, destFilePath);
rename(origFilePath, destFilePath);
currentArchive.libraries.add(new SparkLibrary(destFilePath, md5sum, SparkLibrary.LibType.SPARK2X, size));
}
LOG.info("finished to upload archive to repository, currentDppVersion={}, path={}",
@ -217,6 +227,9 @@ public class SparkRepository {
continue;
}
String md5sum = lib_arg[0];
if (Strings.isNullOrEmpty(md5sum)) {
continue;
}
String type = lib_arg[1];
SparkLibrary.LibType libType = null;
switch (type) {
@ -271,6 +284,16 @@ public class SparkRepository {
}
}
private void rename(String origFilePath, String destFilePath) throws LoadException {
try {
BrokerUtil.rename(origFilePath, destFilePath, brokerDesc);
LOG.info("finished to rename file, originPath={}, destPath={}", origFilePath, destFilePath);
} catch (UserException e) {
throw new LoadException("failed to rename file from " + origFilePath + " to " + destFilePath +
", message=" + e.getMessage());
}
}
public SparkArchive getCurrentArchive() {
return currentArchive;
}

View File

@ -119,7 +119,10 @@ public class SparkRepositoryTest {
throws UserException { return false; }
@Mock
void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc)
throws UserException { return;}
throws UserException { return; }
@Mock
void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc)
throws UserException { return; }
};
BrokerDesc brokerDesc = new BrokerDesc("broker", Maps.newHashMap());
@ -179,7 +182,10 @@ public class SparkRepositoryTest {
throws UserException { return; }
@Mock
void writeFile(String srcFilePath, String destFilePath, BrokerDesc brokerDesc)
throws UserException { return;}
throws UserException { return; }
@Mock
void rename(String origFilePath, String destFilePath, BrokerDesc brokerDesc)
throws UserException { return; }
};
// new md5dum of local library