[Enhencement](Export) add property for outfile/export and add test (#18997)

This pr does three things:
1. add `delete_existing_files` property for outfile/export. If `delete_existing_files = true`, export/outfile will delete all files under file_path first.
2. add p2 test for export
3. modify docs
This commit is contained in:
Tiewei Fang
2023-05-08 14:02:20 +08:00
committed by GitHub
parent 8c4f3d4126
commit e78149cb65
20 changed files with 2045 additions and 227 deletions

View File

@ -81,6 +81,7 @@ public class ExportStmt extends StatementBase {
private String label;
private String maxFileSize;
private String deleteExistingFiles;
private SessionVariable sessionVariables;
private String qualifiedUser;
@ -338,6 +339,7 @@ public class ExportStmt extends StatementBase {
// max_file_size
this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");
if (properties.containsKey(LABEL)) {
FeNameFormat.checkLabel(properties.get(LABEL));
@ -397,4 +399,8 @@ public class ExportStmt extends StatementBase {
public String getMaxFileSize() {
return maxFileSize;
}
public String getDeleteExistingFiles() {
return deleteExistingFiles;
}
}

View File

@ -130,8 +130,8 @@ public class OutFileClause {
public static final String PROP_LINE_DELIMITER = "line_delimiter";
public static final String PROP_MAX_FILE_SIZE = "max_file_size";
private static final String PROP_SUCCESS_FILE_NAME = "success_file_name";
public static final String PROP_DELETE_EXISTING_FILES = "delete_existing_files";
private static final String PARQUET_PROP_PREFIX = "parquet.";
private static final String ORC_PROP_PREFIX = "orc.";
private static final String SCHEMA = "schema";
private static final long DEFAULT_MAX_FILE_SIZE_BYTES = 1 * 1024 * 1024 * 1024; // 1GB
@ -148,6 +148,7 @@ public class OutFileClause {
private String lineDelimiter = "\n";
private TFileFormatType fileFormatType;
private long maxFileSizeBytes = DEFAULT_MAX_FILE_SIZE_BYTES;
private boolean deleteExistingFiles = false;
private BrokerDesc brokerDesc = null;
// True if result is written to local disk.
// If set to true, the brokerDesc must be null.
@ -597,6 +598,12 @@ public class OutFileClause {
processedPropKeys.add(PROP_MAX_FILE_SIZE);
}
if (properties.containsKey(PROP_DELETE_EXISTING_FILES)) {
deleteExistingFiles = Boolean.parseBoolean(properties.get(PROP_DELETE_EXISTING_FILES))
& Config.enable_delete_existing_files;
processedPropKeys.add(PROP_DELETE_EXISTING_FILES);
}
if (properties.containsKey(PROP_SUCCESS_FILE_NAME)) {
successFileName = properties.get(PROP_SUCCESS_FILE_NAME);
FeNameFormat.checkCommonName("file name", successFileName);
@ -833,6 +840,7 @@ public class OutFileClause {
sinkOptions.setLineDelimiter(lineDelimiter);
}
sinkOptions.setMaxFileSizeBytes(maxFileSizeBytes);
sinkOptions.setDeleteExistingFiles(deleteExistingFiles);
if (brokerDesc != null) {
sinkOptions.setBrokerProperties(brokerDesc.getProperties());
// broker_addresses of sinkOptions will be set in Coordinator.

View File

@ -143,6 +143,8 @@ public class ExportJob implements Writable {
private int timeoutSecond;
@SerializedName("maxFileSize")
private String maxFileSize;
@SerializedName("deleteExistingFiles")
private String deleteExistingFiles;
// progress has two functions at EXPORTING stage:
// 1. when progress < 100, it indicates exporting
// 2. set progress = 100 ONLY when exporting progress is completely done
@ -222,6 +224,7 @@ public class ExportJob implements Writable {
this.userIdentity = stmt.getUserIdentity();
this.format = stmt.getFormat();
this.maxFileSize = stmt.getMaxFileSize();
this.deleteExistingFiles = stmt.getDeleteExistingFiles();
this.partitions = stmt.getPartitions();
this.exportTable = db.getTableOrDdlException(stmt.getTblName().getTbl());
@ -287,6 +290,9 @@ public class ExportJob implements Writable {
if (!maxFileSize.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_MAX_FILE_SIZE, maxFileSize);
}
if (!deleteExistingFiles.isEmpty()) {
outfileProperties.put(OutFileClause.PROP_DELETE_EXISTING_FILES, deleteExistingFiles);
}
// broker properties
// outfile clause's broker properties need 'broker.' prefix
@ -359,6 +365,10 @@ public class ExportJob implements Writable {
return maxFileSize;
}
public String getDeleteExistingFiles() {
return deleteExistingFiles;
}
public String getQualifiedUser() {
return qualifiedUser;
}

View File

@ -420,6 +420,7 @@ public class ExportMgr extends MasterDaemon {
infoMap.put("columns", job.getColumns());
infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size());
infoMap.put("max_file_size", job.getMaxFileSize());
infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
jobInfo.add(new Gson().toJson(infoMap));
// path
jobInfo.add(job.getExportPath());