From 8b79abcaba2aa453c8564a7b18669aa49f482423 Mon Sep 17 00:00:00 2001 From: lichaoyong Date: Tue, 11 Jun 2019 21:05:45 +0800 Subject: [PATCH] Support setting exec_mem_limit in ExportJob (#1280) --- fe/pom.xml | 2 +- .../org/apache/doris/analysis/ExportStmt.java | 20 ++++++++++++++--- .../org/apache/doris/common/FeConstants.java | 2 +- .../apache/doris/common/FeMetaVersion.java | 2 ++ .../java/org/apache/doris/load/ExportJob.java | 22 +++++++++++++++++++ 5 files changed, 43 insertions(+), 5 deletions(-) diff --git a/fe/pom.xml b/fe/pom.xml index ab67ab1584..2f08e3cd59 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -607,7 +607,7 @@ under the License. exec - java + ${env.JAVA_HOME}/bin/java -jar ${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar diff --git a/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java index 61b1417f5f..4ad1455d39 100644 --- a/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/ExportStmt.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.FsBroker; import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; @@ -34,6 +35,7 @@ import org.apache.doris.qe.ConnectContext; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.base.Strings; +import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,7 +60,7 @@ public class ExportStmt extends StatementBase { private List partitions; private final String path; private final BrokerDesc brokerDesc; - private final Map properties; + private Map properties = Maps.newHashMap(); private String columnSeparator; private String lineDelimiter; @@ -68,7 +70,9 @@ public class ExportStmt extends StatementBase { Map properties, BrokerDesc brokerDesc) { this.tableRef = tableRef; this.path = path.trim(); - this.properties = properties; + if (properties != null) { + this.properties = properties; + } this.brokerDesc = brokerDesc; this.columnSeparator = DEFAULT_COLUMN_SEPARATOR; this.lineDelimiter = DEFAULT_LINE_DELIMITER; @@ -221,10 +225,20 @@ public class ExportStmt extends StatementBase { throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' path."); } - private void checkProperties(Map properties) throws AnalysisException { + private void checkProperties(Map properties) throws UserException { this.columnSeparator = PropertyAnalyzer.analyzeColumnSeparator( properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR); this.lineDelimiter = PropertyAnalyzer.analyzeLineDelimiter(properties, ExportStmt.DEFAULT_LINE_DELIMITER); + if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) { + try { + Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)); + } catch (NumberFormatException e) { + throw new DdlException("Execute memory limit is not Long", e); + } + } else { + properties.put(LoadStmt.EXEC_MEM_LIMIT, + String.valueOf(ConnectContext.get().getSessionVariable().getMaxExecMemByte())); + } } @Override diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 74850a52d2..d52173095f 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_52; + public static int meta_version = FeMetaVersion.VERSION_53; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 4b64d79cac..0532ef763c 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -114,4 +114,6 @@ public final class FeMetaVersion { public static final int VERSION_51 = 51; // small files public static final int VERSION_52 = 52; + // Support exec_mem_limit in ExportJob + public static final int VERSION_53 = 53; } diff --git a/fe/src/main/java/org/apache/doris/load/ExportJob.java b/fe/src/main/java/org/apache/doris/load/ExportJob.java index 2f5ecef6c2..44f1a71ddb 100644 --- a/fe/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/src/main/java/org/apache/doris/load/ExportJob.java @@ -23,6 +23,7 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.ExportStmt; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TableName; @@ -103,6 +104,7 @@ public class ExportJob implements Writable { private String exportPath; private String columnSeparator; private String lineDelimiter; + private Map properties = Maps.newHashMap(); private List partitions; private TableName tableName; @@ -181,6 +183,7 @@ public class ExportJob implements Writable { this.columnSeparator = stmt.getColumnSeparator(); this.lineDelimiter = stmt.getLineDelimiter(); + this.properties = stmt.getProperties(); String path = stmt.getPath(); Preconditions.checkArgument(!Strings.isNullOrEmpty(path)); @@ -294,6 +297,7 @@ public class ExportJob implements Writable { TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + i, uuid.getLeastSignificantBits()); Coordinator coord = new Coordinator( id, queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), clusterName); + coord.setExecMemoryLimit(getExecMemLimit()); coords.add(coord); this.coordList.add(coord); } @@ -394,6 +398,10 @@ public class ExportJob implements Writable { return this.lineDelimiter; } + public long getExecMemLimit() { + return Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT)); + } + public List getPartitions() { return partitions; } @@ -553,6 +561,11 @@ public class ExportJob implements Writable { Text.writeString(out, exportPath); Text.writeString(out, columnSeparator); Text.writeString(out, lineDelimiter); + out.writeInt(properties.size()); + for (Map.Entry property : properties.entrySet()) { + Text.writeString(out, property.getKey()); + Text.writeString(out, property.getValue()); + } // partitions boolean hasPartition = (partitions != null); @@ -595,6 +608,15 @@ public class ExportJob implements Writable { columnSeparator = Text.readString(in); lineDelimiter = Text.readString(in); + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_53) { + int count = in.readInt(); + for (int i = 0; i < count; i++) { + String propertyKey = Text.readString(in); + String propertyValue = Text.readString(in); + this.properties.put(propertyKey, propertyValue); + } + } + boolean hasPartition = in.readBoolean(); if (hasPartition) { partitions = Lists.newArrayList();