Support setting exec_mem_limit in ExportJob (#1280)
This commit is contained in:
@ -607,7 +607,7 @@ under the License.
|
||||
<goal>exec</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<executable>java</executable>
|
||||
<executable>${env.JAVA_HOME}/bin/java</executable>
|
||||
<arguments>
|
||||
<argument>-jar</argument>
|
||||
<argument>${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar</argument>
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.catalog.FsBroker;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.UserException;
|
||||
@ -34,6 +35,7 @@ import org.apache.doris.qe.ConnectContext;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -58,7 +60,7 @@ public class ExportStmt extends StatementBase {
|
||||
private List<String> partitions;
|
||||
private final String path;
|
||||
private final BrokerDesc brokerDesc;
|
||||
private final Map<String, String> properties;
|
||||
private Map<String, String> properties = Maps.newHashMap();
|
||||
private String columnSeparator;
|
||||
private String lineDelimiter;
|
||||
|
||||
@ -68,7 +70,9 @@ public class ExportStmt extends StatementBase {
|
||||
Map<String, String> properties, BrokerDesc brokerDesc) {
|
||||
this.tableRef = tableRef;
|
||||
this.path = path.trim();
|
||||
this.properties = properties;
|
||||
if (properties != null) {
|
||||
this.properties = properties;
|
||||
}
|
||||
this.brokerDesc = brokerDesc;
|
||||
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
|
||||
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
|
||||
@ -221,10 +225,20 @@ public class ExportStmt extends StatementBase {
|
||||
throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' path.");
|
||||
}
|
||||
|
||||
private void checkProperties(Map<String, String> properties) throws AnalysisException {
|
||||
private void checkProperties(Map<String, String> properties) throws UserException {
|
||||
this.columnSeparator = PropertyAnalyzer.analyzeColumnSeparator(
|
||||
properties, ExportStmt.DEFAULT_COLUMN_SEPARATOR);
|
||||
this.lineDelimiter = PropertyAnalyzer.analyzeLineDelimiter(properties, ExportStmt.DEFAULT_LINE_DELIMITER);
|
||||
if (properties.containsKey(LoadStmt.EXEC_MEM_LIMIT)) {
|
||||
try {
|
||||
Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT));
|
||||
} catch (NumberFormatException e) {
|
||||
throw new DdlException("Execute memory limit is not Long", e);
|
||||
}
|
||||
} else {
|
||||
properties.put(LoadStmt.EXEC_MEM_LIMIT,
|
||||
String.valueOf(ConnectContext.get().getSessionVariable().getMaxExecMemByte()));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -35,5 +35,5 @@ public class FeConstants {
|
||||
|
||||
// general model
|
||||
// Current meta data version. Use this version to write journals and image
|
||||
public static int meta_version = FeMetaVersion.VERSION_52;
|
||||
public static int meta_version = FeMetaVersion.VERSION_53;
|
||||
}
|
||||
|
||||
@ -114,4 +114,6 @@ public final class FeMetaVersion {
|
||||
public static final int VERSION_51 = 51;
|
||||
// small files
|
||||
public static final int VERSION_52 = 52;
|
||||
// Support exec_mem_limit in ExportJob
|
||||
public static final int VERSION_53 = 53;
|
||||
}
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.analysis.BrokerDesc;
|
||||
import org.apache.doris.analysis.DescriptorTable;
|
||||
import org.apache.doris.analysis.ExportStmt;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.LoadStmt;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TableName;
|
||||
@ -103,6 +104,7 @@ public class ExportJob implements Writable {
|
||||
private String exportPath;
|
||||
private String columnSeparator;
|
||||
private String lineDelimiter;
|
||||
private Map<String, String> properties = Maps.newHashMap();
|
||||
private List<String> partitions;
|
||||
|
||||
private TableName tableName;
|
||||
@ -181,6 +183,7 @@ public class ExportJob implements Writable {
|
||||
|
||||
this.columnSeparator = stmt.getColumnSeparator();
|
||||
this.lineDelimiter = stmt.getLineDelimiter();
|
||||
this.properties = stmt.getProperties();
|
||||
|
||||
String path = stmt.getPath();
|
||||
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
|
||||
@ -294,6 +297,7 @@ public class ExportJob implements Writable {
|
||||
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + i, uuid.getLeastSignificantBits());
|
||||
Coordinator coord = new Coordinator(
|
||||
id, queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), clusterName);
|
||||
coord.setExecMemoryLimit(getExecMemLimit());
|
||||
coords.add(coord);
|
||||
this.coordList.add(coord);
|
||||
}
|
||||
@ -394,6 +398,10 @@ public class ExportJob implements Writable {
|
||||
return this.lineDelimiter;
|
||||
}
|
||||
|
||||
public long getExecMemLimit() {
|
||||
return Long.parseLong(properties.get(LoadStmt.EXEC_MEM_LIMIT));
|
||||
}
|
||||
|
||||
public List<String> getPartitions() {
|
||||
return partitions;
|
||||
}
|
||||
@ -553,6 +561,11 @@ public class ExportJob implements Writable {
|
||||
Text.writeString(out, exportPath);
|
||||
Text.writeString(out, columnSeparator);
|
||||
Text.writeString(out, lineDelimiter);
|
||||
out.writeInt(properties.size());
|
||||
for (Map.Entry<String, String> property : properties.entrySet()) {
|
||||
Text.writeString(out, property.getKey());
|
||||
Text.writeString(out, property.getValue());
|
||||
}
|
||||
|
||||
// partitions
|
||||
boolean hasPartition = (partitions != null);
|
||||
@ -595,6 +608,15 @@ public class ExportJob implements Writable {
|
||||
columnSeparator = Text.readString(in);
|
||||
lineDelimiter = Text.readString(in);
|
||||
|
||||
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_53) {
|
||||
int count = in.readInt();
|
||||
for (int i = 0; i < count; i++) {
|
||||
String propertyKey = Text.readString(in);
|
||||
String propertyValue = Text.readString(in);
|
||||
this.properties.put(propertyKey, propertyValue);
|
||||
}
|
||||
}
|
||||
|
||||
boolean hasPartition = in.readBoolean();
|
||||
if (hasPartition) {
|
||||
partitions = Lists.newArrayList();
|
||||
|
||||
Reference in New Issue
Block a user