[Fix](Export) Export delete multiple times when specify the delete_existing_files property () (#39304)

bp: #38400

When the `Export` statement specifies the `delete_existing_files`
property, each `Outfile` statement generated by the `Export` will carry
this property. This causes each `Outfile` statement to delete existing
files, so only the result of the last Outfile statement will be
retained.

So, we add a rpc method which can delete existing files for `Export`
statement and the `Outfile` statements generated by the `Export` will
not carry `delete_existing_files` property any more.

## Proposed changes

Issue Number: close #xxx

<!--Describe your changes.-->
This commit is contained in:
Tiewei Fang
2024-08-13 22:26:02 +08:00
committed by GitHub
parent 6f1d9812bb
commit 187461e2fd
10 changed files with 34 additions and 32 deletions

View File

@ -678,7 +678,7 @@ void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
uint32_t len = request->result_file_sink().size();
st = deserialize_thrift_msg(buf, &len, false, &result_file_sink);
if (!st.ok()) {
LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
st.to_protobuf(result->mutable_status());
return;
}
@ -697,7 +697,7 @@ void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
bool exists = true;
st = io::global_local_filesystem()->exists(file_name, &exists);
if (!st.ok()) {
LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
LOG(WARNING) << "outfile write success filefailed, errmsg = " << st;
st.to_protobuf(result->mutable_status());
return;
}
@ -705,7 +705,7 @@ void PInternalServiceImpl::outfile_write_success(google::protobuf::RpcController
st = Status::InternalError("File already exists: {}", file_name);
}
if (!st.ok()) {
LOG(WARNING) << "outfile write success filefailed, errmsg=" << st;
LOG(WARNING) << "outfile write success file failed, errmsg = " << st;
st.to_protobuf(result->mutable_status());
return;
}

View File

@ -107,6 +107,16 @@ public class BrokerUtil {
}
}
public static void deleteDirectoryWithFileSystem(String path, BrokerDesc brokerDesc) throws UserException {
RemoteFileSystem fileSystem = FileSystemFactory.get(
brokerDesc.getName(), brokerDesc.getStorageType(), brokerDesc.getProperties());
Status st = fileSystem.deleteDirectory(path);
if (!st.ok()) {
throw new UserException(brokerDesc.getName() + " delete directory exception. path="
+ path + ", err: " + st.getErrMsg());
}
}
public static String printBroker(String brokerName, TNetworkAddress address) {
return brokerName + "[" + address.toString() + "]";
}
@ -358,7 +368,7 @@ public class BrokerUtil {
* @param brokerDesc
* @throws UserException if broker op failed
*/
public static void deletePath(String path, BrokerDesc brokerDesc) throws UserException {
public static void deletePathWithBroker(String path, BrokerDesc brokerDesc) throws UserException {
TNetworkAddress address = getAddress(brokerDesc);
TPaloBrokerService.Client client = borrowClient(address);
boolean failed = true;

View File

@ -634,9 +634,7 @@ public class ExportJob implements Writable {
if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize);
}
if (!deleteExistingFiles.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, deleteExistingFiles);
}
outfileProperties.put(OutFileClause.PROP_WITH_BOM, withBom);
// broker properties

View File

@ -32,6 +32,7 @@ import org.apache.doris.common.FeConstants;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherWrapper;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
@ -103,6 +104,15 @@ public class ExportMgr {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
// delete existing files
if (Config.enable_delete_existing_files && Boolean.parseBoolean(job.getDeleteExistingFiles())) {
if (job.getBrokerDesc() == null) {
throw new AnalysisException("Local file system does not support delete existing files");
}
String fullPath = job.getExportPath();
BrokerUtil.deleteDirectoryWithFileSystem(fullPath.substring(0, fullPath.lastIndexOf('/') + 1),
job.getBrokerDesc());
}
job.getTaskExecutors().forEach(executor -> {
Long taskId = Env.getCurrentEnv().getTransientTaskManager().addMemoryTask(executor);
job.getTaskIdToExecutor().put(taskId, executor);

View File

@ -365,7 +365,7 @@ public class SparkEtlJobHandler {
public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
try {
BrokerUtil.deletePath(outputPath, brokerDesc);
BrokerUtil.deletePathWithBroker(outputPath, brokerDesc);
LOG.info("delete path success. path: {}", outputPath);
} catch (UserException e) {
LOG.warn("delete path failed. path: {}", outputPath, e);

View File

@ -166,7 +166,7 @@ public class SparkRepository {
try {
String remoteArchivePath = getRemoteArchivePath(currentDppVersion);
if (isReplace) {
BrokerUtil.deletePath(remoteArchivePath, brokerDesc);
BrokerUtil.deletePathWithBroker(remoteArchivePath, brokerDesc);
currentArchive.libraries.clear();
}
String srcFilePath = null;

View File

@ -153,16 +153,12 @@ import org.apache.doris.nereids.trees.plans.commands.insert.InsertOverwriteTable
import org.apache.doris.nereids.trees.plans.commands.insert.OlapInsertExecutor;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
import org.apache.doris.planner.DataSink;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.OriginalPlanner;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.planner.Planner;
import org.apache.doris.planner.ResultFileSink;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.Data;
import org.apache.doris.proto.InternalService;
@ -172,7 +168,6 @@ import org.apache.doris.proto.InternalService.POutfileWriteSuccessResult;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.CommonResultSet.CommonResultSetMetaData;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.Coordinator.FragmentExecParams;
import org.apache.doris.qe.QeProcessorImpl.QueryInfo;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.cache.Cache;
@ -1892,26 +1887,18 @@ public class StmtExecutor {
TResultFileSinkOptions sinkOptions = outFileClause.toSinkOptions();
// 2. set brokerNetAddress
List<PlanFragment> fragments = coord.getFragments();
Map<PlanFragmentId, FragmentExecParams> fragmentExecParamsMap = coord.getFragmentExecParamsMap();
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
DataSink topDataSink = topParams.fragment.getSink();
TNetworkAddress execBeAddr = topParams.instanceExecParams.get(0).host;
if (topDataSink instanceof ResultFileSink
&& ((ResultFileSink) topDataSink).getStorageType() == StorageBackend.StorageType.BROKER) {
StorageType storageType = outFileClause.getBrokerDesc() == null
? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType();
if (storageType == StorageType.BROKER) {
// set the broker address for OUTFILE sink
ResultFileSink topResultFileSink = (ResultFileSink) topDataSink;
FsBroker broker = Env.getCurrentEnv().getBrokerMgr()
.getBroker(topResultFileSink.getBrokerName(), execBeAddr.getHostname());
String brokerName = outFileClause.getBrokerDesc().getName();
FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyBroker(brokerName);
sinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(broker.host, broker.port)));
}
// 3. set TResultFileSink properties
TResultFileSink sink = new TResultFileSink();
sink.setFileOptions(sinkOptions);
StorageType storageType = outFileClause.getBrokerDesc() == null
? StorageBackend.StorageType.LOCAL : outFileClause.getBrokerDesc().getStorageType();
sink.setStorageBackendType(storageType.toThrift());
// 4. get BE

View File

@ -318,7 +318,7 @@ public class BrokerUtilTest {
try {
BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
BrokerUtil.deletePath("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc);
BrokerUtil.deletePathWithBroker("hdfs://127.0.0.1:10000/doris/jobs/1/label6/9", brokerDesc);
} catch (Exception e) {
Assert.fail(e.getMessage());
}

View File

@ -418,7 +418,7 @@ public class SparkEtlJobHandlerTest {
public void testDeleteEtlOutputPath(@Mocked BrokerUtil brokerUtil) throws UserException {
new Expectations() {
{
BrokerUtil.deletePath(etlOutputPath, (BrokerDesc) any);
BrokerUtil.deletePathWithBroker(etlOutputPath, (BrokerDesc) any);
times = 1;
}
};

View File

@ -688,9 +688,6 @@ message PFetchTableSchemaResult {
}
message POutfileWriteSuccessRequest {
// optional string file_path = 1;
// optional string success_file_name = 2;
// map<string, string> broker_properties = 4; // only for remote file
optional bytes result_file_sink = 1;
}