[fix](export) fix timeout property not work for export job (#25913)

Co-authored-by: caiconghui1 <caiconghui1@jd.com>
This commit is contained in:
caiconghui
2023-10-26 18:51:57 +08:00
committed by GitHub
parent 2679fa4ea7
commit 678dc366e0
3 changed files with 36 additions and 5 deletions

View File

@ -58,7 +58,7 @@ import java.util.stream.Collectors;
// EXPORT TABLE table_name [PARTITION (name1[, ...])]
// TO 'export_target_path'
// [PROPERTIES("key"="value")]
// BY BROKER 'broker_name' [( $broker_attrs)]
// WITH BROKER 'broker_name' [( $broker_attrs)]
@Getter
public class ExportStmt extends StatementBase {
public static final String PARALLELISM = "parallelism";
@ -67,6 +67,7 @@ public class ExportStmt extends StatementBase {
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
private static final Integer DEFAULT_TIMEOUT = 7200;
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(LABEL)
@ -76,6 +77,7 @@ public class ExportStmt extends StatementBase {
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
.add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
.add("format")
.build();
@ -97,6 +99,8 @@ public class ExportStmt extends StatementBase {
private Integer parallelism;
private Integer timeout;
private String maxFileSize;
private String deleteExistingFiles;
private SessionVariable sessionVariables;
@ -118,6 +122,7 @@ public class ExportStmt extends StatementBase {
this.brokerDesc = brokerDesc;
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
this.timeout = DEFAULT_TIMEOUT;
Optional<SessionVariable> optionalSessionVariable = Optional.ofNullable(
ConnectContext.get().getSessionVariable());
@ -232,8 +237,10 @@ public class ExportStmt extends StatementBase {
// set sessions
exportJob.setQualifiedUser(this.qualifiedUser);
exportJob.setUserIdentity(this.userIdentity);
exportJob.setSessionVariables(this.sessionVariables);
exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS());
SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable(
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
exportJob.setSessionVariables(clonedSessionVariable);
exportJob.setTimeoutSecond(this.timeout);
exportJob.setOrigStmt(this.getOrigStmt());
}
@ -323,6 +330,15 @@ public class ExportStmt extends StatementBase {
throw new UserException("The value of parallelism is invalid!");
}
// timeout
String timeoutString = properties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT,
String.valueOf(DEFAULT_TIMEOUT));
try {
this.timeout = Integer.parseInt(timeoutString);
} catch (NumberFormatException e) {
throw new UserException("The value of timeout is invalid!");
}
// max_file_size
this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");

View File

@ -168,6 +168,7 @@ public class ExportTaskExecutor implements TransientTaskExecutor {
private AutoCloseConnectContext buildConnectContext() {
ConnectContext connectContext = new ConnectContext();
exportJob.getSessionVariables().setQueryTimeoutS(exportJob.getTimeoutSecond());
connectContext.setSessionVariable(exportJob.getSessionVariables());
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setDatabase(exportJob.getTableName().getDb());

View File

@ -68,7 +68,7 @@ import java.util.stream.Collectors;
* EXPORT TABLE table_name [PARTITION (name1[, ...])]
* TO 'export_target_path'
* [PROPERTIES("key"="value")]
* BY BROKER 'broker_name' [( $broker_attrs)]
* WITH BROKER 'broker_name' [( $broker_attrs)]
*/
public class ExportCommand extends Command implements ForwardWithSync {
public static final String PARALLELISM = "parallelism";
@ -76,6 +76,8 @@ public class ExportCommand extends Command implements ForwardWithSync {
private static final String DEFAULT_COLUMN_SEPARATOR = "\t";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_PARALLELISM = "1";
private static final Integer DEFAULT_TIMEOUT = 7200;
private static final ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(LABEL)
.add(PARALLELISM)
@ -84,6 +86,7 @@ public class ExportCommand extends Command implements ForwardWithSync {
.add(OutFileClause.PROP_DELETE_EXISTING_FILES)
.add(PropertyAnalyzer.PROPERTIES_COLUMN_SEPARATOR)
.add(PropertyAnalyzer.PROPERTIES_LINE_DELIMITER)
.add(PropertyAnalyzer.PROPERTIES_TIMEOUT)
.add("format")
.build();
@ -305,7 +308,18 @@ public class ExportCommand extends Command implements ForwardWithSync {
SessionVariable clonedSessionVariable = VariableMgr.cloneSessionVariable(Optional.ofNullable(
ConnectContext.get().getSessionVariable()).orElse(VariableMgr.getDefaultSessionVariable()));
exportJob.setSessionVariables(clonedSessionVariable);
exportJob.setTimeoutSecond(clonedSessionVariable.getQueryTimeoutS());
// set timeoutSecond
int timeoutSecond;
String timeoutString = fileProperties.getOrDefault(PropertyAnalyzer.PROPERTIES_TIMEOUT,
String.valueOf(DEFAULT_TIMEOUT));
try {
timeoutSecond = Integer.parseInt(timeoutString);
} catch (NumberFormatException e) {
throw new UserException("The value of timeout is invalid!");
}
exportJob.setTimeoutSecond(timeoutSecond);
// exportJob generate outfile sql
exportJob.generateOutfileLogicalPlans(RelationUtil.getQualifierName(ctx, this.nameParts));