[Feauture](Export) support parallel export job using Job Schedule (#22854)

This commit is contained in:
Tiewei Fang
2023-08-18 22:24:42 +08:00
committed by GitHub
parent 6847592137
commit 10abbd2b62
23 changed files with 905 additions and 617 deletions

View File

@ -2102,6 +2102,11 @@ public class Config extends ConfigBase {
"The maximum parallelism allowed by Export job"})
public static int maximum_parallelism_of_export_job = 50;
@ConfField(mutable = true, description = {
"ExportExecutorTask任务中一个OutFile语句允许的最大tablets数量",
"The maximum number of tablets allowed by an OutfileStatement in an ExportExecutorTask"})
public static int maximum_tablets_of_outfile_in_export = 10;
@ConfField(mutable = true, description = {
"是否用 mysql 的 bigint 类型来返回 Doris 的 largeint 类型",
"Whether to use mysql's bigint type to return Doris's largeint type"})

View File

@ -21,8 +21,7 @@ import org.apache.doris.analysis.BinaryPredicate.Operator;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJob.JobState;
import org.apache.doris.load.ExportJobState;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
@ -83,11 +82,10 @@ public class CancelExportStmt extends DdlStmt {
throw new AnalysisException("Only label can use like");
}
state = inputValue;
ExportJob.JobState jobState = ExportJob.JobState.valueOf(state);
if (jobState != ExportJob.JobState.PENDING
&& jobState != JobState.IN_QUEUE
&& jobState != ExportJob.JobState.EXPORTING) {
throw new AnalysisException("Only support PENDING/IN_QUEUE/EXPORTING, invalid state: " + state);
ExportJobState jobState = ExportJobState.valueOf(state);
if (jobState != ExportJobState.PENDING
&& jobState != ExportJobState.EXPORTING) {
throw new AnalysisException("Only support PENDING/EXPORTING, invalid state: " + state);
}
}
}

View File

@ -17,13 +17,13 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.BrokerMgr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeNameFormat;
@ -32,6 +32,7 @@ import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.URI;
import org.apache.doris.common.util.Util;
import org.apache.doris.load.ExportJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
@ -39,25 +40,26 @@ import org.apache.doris.qe.VariableMgr;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import lombok.Getter;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
// EXPORT statement, export data to dirs by broker.
//
// syntax:
// EXPORT TABLE tablename [PARTITION (name1[, ...])]
// EXPORT TABLE table_name [PARTITION (name1[, ...])]
// TO 'export_target_path'
// [PROPERTIES("key"="value")]
// BY BROKER 'broker_name' [( $broker_attrs)]
@Getter
public class ExportStmt extends StatementBase {
private static final Logger LOG = LogManager.getLogger(ExportStmt.class);
public static final String PARALLELISM = "parallelism";
public static final String LABEL = "label";
@ -106,6 +108,8 @@ public class ExportStmt extends StatementBase {
private UserIdentity userIdentity;
private ExportJob exportJob;
public ExportStmt(TableRef tableRef, Expr whereExpr, String path,
Map<String, String> properties, BrokerDesc brokerDesc) {
this.tableRef = tableRef;
@ -118,75 +122,15 @@ public class ExportStmt extends StatementBase {
this.columnSeparator = DEFAULT_COLUMN_SEPARATOR;
this.lineDelimiter = DEFAULT_LINE_DELIMITER;
this.columns = DEFAULT_COLUMNS;
if (ConnectContext.get() != null) {
this.sessionVariables = ConnectContext.get().getSessionVariable();
} else {
this.sessionVariables = VariableMgr.getDefaultSessionVariable();
}
}
public String getColumns() {
return columns;
}
public TableName getTblName() {
return tblName;
}
public List<String> getPartitions() {
return partitionStringNames;
}
public Expr getWhereExpr() {
return whereExpr;
}
public String getPath() {
return path;
}
public BrokerDesc getBrokerDesc() {
return brokerDesc;
}
public String getColumnSeparator() {
return this.columnSeparator;
}
public String getLineDelimiter() {
return this.lineDelimiter;
}
public TableRef getTableRef() {
return this.tableRef;
}
public String getFormat() {
return format;
}
public String getLabel() {
return label;
}
public SessionVariable getSessionVariables() {
return sessionVariables;
}
public String getQualifiedUser() {
return qualifiedUser;
}
public UserIdentity getUserIdentity() {
return this.userIdentity;
Optional<SessionVariable> optionalSessionVariable = Optional.ofNullable(
ConnectContext.get().getSessionVariable());
this.sessionVariables = optionalSessionVariable.orElse(VariableMgr.getDefaultSessionVariable());
}
@Override
public boolean needAuditEncryption() {
if (brokerDesc != null) {
return true;
}
return false;
return brokerDesc != null;
}
@Override
@ -197,16 +141,17 @@ public class ExportStmt extends StatementBase {
Preconditions.checkNotNull(tableRef);
tableRef.analyze(analyzer);
this.tblName = tableRef.getName();
// disallow external catalog
tblName = tableRef.getName();
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
PartitionNames partitionNames = tableRef.getPartitionNames();
if (partitionNames != null) {
if (partitionNames.isTemp()) {
// get partitions name
Optional<PartitionNames> optionalPartitionNames = Optional.ofNullable(tableRef.getPartitionNames());
if (optionalPartitionNames.isPresent()) {
if (optionalPartitionNames.get().isTemp()) {
throw new AnalysisException("Do not support exporting temporary partitions");
}
partitionStringNames = partitionNames.getPartitionNames();
partitionStringNames = optionalPartitionNames.get().getPartitionNames();
}
// check auth
@ -222,7 +167,7 @@ public class ExportStmt extends StatementBase {
userIdentity = ConnectContext.get().getCurrentUserIdentity();
// check table && partitions whether exist
checkTable(analyzer.getEnv());
checkPartitions(analyzer.getEnv());
// check broker whether exist
if (brokerDesc == null) {
@ -232,28 +177,86 @@ public class ExportStmt extends StatementBase {
// check path is valid
path = checkPath(path, brokerDesc.getStorageType());
if (brokerDesc.getStorageType() == StorageBackend.StorageType.BROKER) {
if (!analyzer.getEnv().getBrokerMgr().containsBroker(brokerDesc.getName())) {
BrokerMgr brokerMgr = analyzer.getEnv().getBrokerMgr();
if (!brokerMgr.containsBroker(brokerDesc.getName())) {
throw new AnalysisException("broker " + brokerDesc.getName() + " does not exist");
}
FsBroker broker = analyzer.getEnv().getBrokerMgr().getAnyBroker(brokerDesc.getName());
if (broker == null) {
if (null == brokerMgr.getAnyBroker(brokerDesc.getName())) {
throw new AnalysisException("failed to get alive broker");
}
}
// check properties
checkProperties(properties);
// create job and analyze job
setJob();
exportJob.analyze();
}
private void checkTable(Env env) throws AnalysisException {
private void setJob() throws UserException {
exportJob = new ExportJob();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(this.tblName.getDb());
exportJob.setDbId(db.getId());
exportJob.setTableName(this.tblName);
exportJob.setExportTable(db.getTableOrDdlException(this.tblName.getTbl()));
exportJob.setTableId(db.getTableOrDdlException(this.tblName.getTbl()).getId());
exportJob.setTableRef(this.tableRef);
// set partitions
exportJob.setPartitionNames(this.partitionStringNames);
// set where expr
exportJob.setWhereExpr(this.whereExpr);
// set path
exportJob.setExportPath(this.path);
// set properties
exportJob.setLabel(this.label);
exportJob.setColumnSeparator(this.columnSeparator);
exportJob.setLineDelimiter(this.lineDelimiter);
exportJob.setFormat(this.format);
exportJob.setColumns(this.columns);
exportJob.setParallelism(this.parallelism);
exportJob.setMaxFileSize(this.maxFileSize);
exportJob.setDeleteExistingFiles(this.deleteExistingFiles);
if (!Strings.isNullOrEmpty(this.columns)) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
exportJob.setExportColumns(split.splitToList(this.columns.toLowerCase()));
}
// set broker desc
exportJob.setBrokerDesc(this.brokerDesc);
// set sessions
exportJob.setQualifiedUser(this.qualifiedUser);
exportJob.setUserIdentity(this.userIdentity);
exportJob.setSessionVariables(this.sessionVariables);
exportJob.setTimeoutSecond(this.sessionVariables.getQueryTimeoutS());
exportJob.setSql(this.toSql());
exportJob.setOrigStmt(this.getOrigStmt());
}
// check partitions specified by user are belonged to the table.
private void checkPartitions(Env env) throws AnalysisException {
if (partitionStringNames == null) {
return;
}
if (partitionStringNames.size() > Config.maximum_number_of_export_partitions) {
throw new AnalysisException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by an export job");
}
Database db = env.getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
Table table = db.getTableOrAnalysisException(tblName.getTbl());
table.readLock();
try {
if (partitionStringNames == null) {
return;
}
// check table
if (!table.isPartitioned()) {
throw new AnalysisException("Table[" + tblName.getTbl() + "] is not partitioned.");
}
@ -270,13 +273,14 @@ public class ExportStmt extends StatementBase {
case VIEW:
default:
throw new AnalysisException("Table[" + tblName.getTbl() + "] is "
+ tblType.toString() + " type, do not support EXPORT.");
+ tblType + " type, do not support EXPORT.");
}
for (String partitionName : partitionStringNames) {
Partition partition = table.getPartition(partitionName);
if (partition == null) {
throw new AnalysisException("Partition [" + partitionName + "] does not exist");
throw new AnalysisException("Partition [" + partitionName + "] does not exist "
+ "in Table[" + tblName.getTbl() + "]");
}
}
} finally {
@ -286,13 +290,17 @@ public class ExportStmt extends StatementBase {
public static String checkPath(String path, StorageBackend.StorageType type) throws AnalysisException {
if (Strings.isNullOrEmpty(path)) {
throw new AnalysisException("No dest path specified.");
throw new AnalysisException("No destination path specified.");
}
URI uri = URI.create(path);
String schema = uri.getScheme();
if (schema == null) {
throw new AnalysisException(
"Invalid export path, there is no schema of URI found. please check your path.");
}
if (type == StorageBackend.StorageType.BROKER) {
if (schema == null || (!schema.equalsIgnoreCase("bos")
if (!schema.equalsIgnoreCase("bos")
&& !schema.equalsIgnoreCase("afs")
&& !schema.equalsIgnoreCase("hdfs")
&& !schema.equalsIgnoreCase("ofs")
@ -302,23 +310,17 @@ public class ExportStmt extends StatementBase {
&& !schema.equalsIgnoreCase("cosn")
&& !schema.equalsIgnoreCase("gfs")
&& !schema.equalsIgnoreCase("jfs")
&& !schema.equalsIgnoreCase("gs"))) {
&& !schema.equalsIgnoreCase("gs")) {
throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+ " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://', 'gfs://', 'gs://' or 'jfs://' path.");
}
} else if (type == StorageBackend.StorageType.S3) {
if (schema == null || !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 's3://' path.");
}
} else if (type == StorageBackend.StorageType.HDFS) {
if (schema == null || !schema.equalsIgnoreCase("hdfs")) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path.");
}
} else if (type == StorageBackend.StorageType.LOCAL) {
if (schema != null && !schema.equalsIgnoreCase("file")) {
throw new AnalysisException(
} else if (type == StorageBackend.StorageType.S3 && !schema.equalsIgnoreCase("s3")) {
throw new AnalysisException("Invalid export path. please use valid 's3://' path.");
} else if (type == StorageBackend.StorageType.HDFS && !schema.equalsIgnoreCase("hdfs")) {
throw new AnalysisException("Invalid export path. please use valid 'HDFS://' path.");
} else if (type == StorageBackend.StorageType.LOCAL && !schema.equalsIgnoreCase("file")) {
throw new AnalysisException(
"Invalid export path. please use valid '" + OutFileClause.LOCAL_FILE_PREFIX + "' path.");
}
}
return path;
}
@ -326,7 +328,7 @@ public class ExportStmt extends StatementBase {
private void checkProperties(Map<String, String> properties) throws UserException {
for (String key : properties.keySet()) {
if (!PROPERTIES_SET.contains(key.toLowerCase())) {
throw new DdlException("Invalid property key: '" + key + "'");
throw new UserException("Invalid property key: [" + key + "]");
}
}
@ -348,20 +350,24 @@ public class ExportStmt extends StatementBase {
// parallelism
String parallelismString = properties.getOrDefault(PARALLELISM, DEFAULT_PARALLELISM);
parallelism = Integer.parseInt(parallelismString);
try {
this.parallelism = Integer.parseInt(parallelismString);
} catch (NumberFormatException e) {
throw new UserException("The value of parallelism is invalid!");
}
// max_file_size
this.maxFileSize = properties.getOrDefault(OutFileClause.PROP_MAX_FILE_SIZE, "");
this.deleteExistingFiles = properties.getOrDefault(OutFileClause.PROP_DELETE_EXISTING_FILES, "");
// label
if (properties.containsKey(LABEL)) {
FeNameFormat.checkLabel(properties.get(LABEL));
this.label = properties.get(LABEL);
} else {
// generate a random label
String label = "export_" + UUID.randomUUID();
properties.put(LABEL, label);
this.label = "export_" + UUID.randomUUID();
}
label = properties.get(LABEL);
}
@Override
@ -408,16 +414,4 @@ public class ExportStmt extends StatementBase {
public String toString() {
return toSql();
}
public String getMaxFileSize() {
return maxFileSize;
}
public String getDeleteExistingFiles() {
return deleteExistingFiles;
}
public Integer getParallelNum() {
return parallelism;
}
}

View File

@ -255,7 +255,7 @@ public class OutFileClause {
if (brokerDesc != null && isLocalOutput) {
throw new AnalysisException("No need to specify BROKER properties in OUTFILE clause for local file output");
} else if (brokerDesc == null && !isLocalOutput) {
throw new AnalysisException("Must specify BROKER properties in OUTFILE clause");
throw new AnalysisException("Must specify BROKER properties or current local file path in OUTFILE clause");
}
isAnalyzed = true;

View File

@ -27,7 +27,7 @@ import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.ExportProcNode;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.load.ExportJob.JobState;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.qe.ShowResultSetMetaData;
import com.google.common.base.Strings;
@ -54,7 +54,7 @@ public class ShowExportStmt extends ShowStmt {
private boolean isLabelUseLike = false;
private String stateValue = null;
private JobState jobState;
private ExportJobState jobState;
private ArrayList<OrderByPair> orderByPairs;
@ -84,7 +84,7 @@ public class ShowExportStmt extends ShowStmt {
return this.jobId;
}
public JobState getJobState() {
public ExportJobState getJobState() {
if (Strings.isNullOrEmpty(stateValue)) {
return null;
}
@ -152,7 +152,7 @@ public class ShowExportStmt extends ShowStmt {
if (!Strings.isNullOrEmpty(value)) {
stateValue = value.toUpperCase();
try {
jobState = JobState.valueOf(stateValue);
jobState = ExportJobState.valueOf(stateValue);
valid = true;
} catch (IllegalArgumentException e) {
LOG.warn("illegal state argument in export stmt. stateValue={}, error={}", stateValue, e);

View File

@ -3755,7 +3755,7 @@ public class Env {
return timerJobManager;
}
public TransientTaskManager getMemoryTaskManager() {
public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}

View File

@ -22,7 +22,7 @@ import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.loadv2.LoadManager;
@ -147,10 +147,10 @@ public class JobsProcDir implements ProcDirInterface {
// export
ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING, dbId);
runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING, dbId);
finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED, dbId);
cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED, dbId);
pendingNum = exportMgr.getJobNum(ExportJobState.PENDING, dbId);
runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING, dbId);
finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED, dbId);
cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED, dbId);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));
@ -209,10 +209,10 @@ public class JobsProcDir implements ProcDirInterface {
// export
ExportMgr exportMgr = Env.getCurrentEnv().getExportMgr();
pendingNum = exportMgr.getJobNum(ExportJob.JobState.PENDING);
runningNum = exportMgr.getJobNum(ExportJob.JobState.EXPORTING);
finishedNum = exportMgr.getJobNum(ExportJob.JobState.FINISHED);
cancelledNum = exportMgr.getJobNum(ExportJob.JobState.CANCELLED);
pendingNum = exportMgr.getJobNum(ExportJobState.PENDING);
runningNum = exportMgr.getJobNum(ExportJobState.EXPORTING);
finishedNum = exportMgr.getJobNum(ExportJobState.FINISHED);
cancelledNum = exportMgr.getJobNum(ExportJobState.CANCELLED);
totalNum = pendingNum + runningNum + finishedNum + cancelledNum;
result.addRow(Lists.newArrayList(EXPORT, pendingNum.toString(), runningNum.toString(), finishedNum.toString(),
cancelledNum.toString(), totalNum.toString()));

View File

@ -47,6 +47,7 @@ import org.apache.doris.ha.MasterInfo;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobStateTransfer;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
@ -319,7 +320,7 @@ public class JournalEntity implements Writable {
isRead = true;
break;
case OperationType.OP_EXPORT_UPDATE_STATE:
data = ExportJob.StateTransfer.read(in);
data = ExportJobStateTransfer.read(in);
isRead = true;
break;
case OperationType.OP_FINISH_DELETE: {

View File

@ -48,17 +48,17 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.registry.ExportTaskRegister;
import org.apache.doris.scheduler.registry.TransientTaskRegister;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
@ -66,7 +66,7 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -81,27 +81,21 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
// NOTE: we must be carefully if we send next request
// as soon as receiving one instance's report from one BE,
// because we may change job's member concurrently.
@Data
public class ExportJob implements Writable {
private static final Logger LOG = LogManager.getLogger(ExportJob.class);
private static final String BROKER_PROPERTY_PREFIXES = "broker.";
public enum JobState {
PENDING,
IN_QUEUE,
EXPORTING,
FINISHED,
CANCELLED,
}
private static final int MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT = Config.maximum_tablets_of_outfile_in_export;
public static final TransientTaskRegister register = new ExportTaskRegister(
Env.getCurrentEnv().getTransientTaskManager());
@SerializedName("id")
private long id;
@SerializedName("queryId")
private String queryId;
@SerializedName("label")
private String label;
@SerializedName("dbId")
@ -121,7 +115,7 @@ public class ExportJob implements Writable {
@SerializedName("tableName")
private TableName tableName;
@SerializedName("state")
private JobState state;
private ExportJobState state;
@SerializedName("createTimeMs")
private long createTimeMs;
// this is the origin stmt of ExportStmt, we use it to persist where expr of Export job,
@ -153,15 +147,19 @@ public class ExportJob implements Writable {
// progress has two functions at EXPORTING stage:
// 1. when progress < 100, it indicates exporting
// 2. set progress = 100 ONLY when exporting progress is completely done
@SerializedName("progress")
private int progress;
@SerializedName("tabletsNum")
private Integer tabletsNum;
private TableRef tableRef;
private Expr whereExpr;
private String sql = "";
private Integer parallelNum;
private Integer parallelism;
public Map<String, Long> getPartitionToVersion() {
return partitionToVersion;
@ -170,9 +168,11 @@ public class ExportJob implements Writable {
private Map<String, Long> partitionToVersion = Maps.newHashMap();
// The selectStmt is sql 'select ... into outfile ...'
@Getter
// TODO(ftw): delete
private List<SelectStmt> selectStmtList = Lists.newArrayList();
private List<List<SelectStmt>> selectStmtListPerParallel = Lists.newArrayList();
private List<StmtExecutor> stmtExecutorList;
private List<String> exportColumns = Lists.newArrayList();
@ -188,16 +188,21 @@ public class ExportJob implements Writable {
private ExportExportingTask task;
private List<TScanRangeLocations> tabletLocations = Lists.newArrayList();
// backend_address => snapshot path
private List<Pair<TNetworkAddress, String>> snapshotPaths = Lists.newArrayList();
private List<ExportTaskExecutor> jobExecutorList;
private ConcurrentHashMap<Long, ExportTaskExecutor> taskIdToExecutor = new ConcurrentHashMap<>();
private Integer finishedTaskCount = 0;
private List<List<OutfileInfo>> allOutfileInfo = Lists.newArrayList();
public ExportJob() {
this.id = -1;
this.queryId = "";
this.dbId = -1;
this.tableId = -1;
this.state = JobState.PENDING;
this.state = ExportJobState.PENDING;
this.progress = 0;
this.createTimeMs = System.currentTimeMillis();
this.startTimeMs = -1;
@ -215,55 +220,37 @@ public class ExportJob implements Writable {
this.id = jobId;
}
public void setJob(ExportStmt stmt) throws UserException {
String dbName = stmt.getTblName().getDb();
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbName);
Preconditions.checkNotNull(stmt.getBrokerDesc());
this.brokerDesc = stmt.getBrokerDesc();
this.columnSeparator = stmt.getColumnSeparator();
this.lineDelimiter = stmt.getLineDelimiter();
this.label = stmt.getLabel();
this.queryId = ConnectContext.get() != null ? DebugUtil.printId(ConnectContext.get().queryId()) : "N/A";
String path = stmt.getPath();
Preconditions.checkArgument(!Strings.isNullOrEmpty(path));
this.whereExpr = stmt.getWhereExpr();
this.parallelNum = stmt.getParallelNum();
this.exportPath = path;
this.sessionVariables = stmt.getSessionVariables();
this.timeoutSecond = sessionVariables.getQueryTimeoutS();
this.qualifiedUser = stmt.getQualifiedUser();
this.userIdentity = stmt.getUserIdentity();
this.format = stmt.getFormat();
this.maxFileSize = stmt.getMaxFileSize();
this.deleteExistingFiles = stmt.getDeleteExistingFiles();
this.partitionNames = stmt.getPartitions();
this.exportTable = db.getTableOrDdlException(stmt.getTblName().getTbl());
this.columns = stmt.getColumns();
this.tableRef = stmt.getTableRef();
if (!Strings.isNullOrEmpty(this.columns)) {
Splitter split = Splitter.on(',').trimResults().omitEmptyStrings();
this.exportColumns = split.splitToList(stmt.getColumns().toLowerCase());
}
/**
* For an ExportJob:
* The ExportJob is divided into multiple 'ExportTaskExecutor'
* according to the 'parallelism' set by the user.
* The tablets which will be exported by this ExportJob are divided into 'parallelism' copies,
* and each ExportTaskExecutor is responsible for a list of tablets.
* The tablets responsible for an ExportTaskExecutor will be assigned to multiple OutfileStmt
* according to the 'TABLETS_NUM_PER_OUTFILE_IN_EXPORT'.
*
* @throws UserException
*/
public void analyze() throws UserException {
exportTable.readLock();
try {
this.dbId = db.getId();
this.tableId = exportTable.getId();
this.tableName = stmt.getTblName();
if (selectStmtList.isEmpty()) {
// This scenario is used for 'EXPORT TABLE tbl INTO PATH'
// we need generate Select Statement
generateQueryStmt(stmt);
}
// generateQueryStmtOld
generateQueryStmt();
} finally {
exportTable.readUnlock();
}
this.sql = stmt.toSql();
this.origStmt = stmt.getOrigStmt();
generateExportJobExecutor();
}
private void generateQueryStmt(ExportStmt stmt) throws UserException {
public void generateExportJobExecutor() {
jobExecutorList = Lists.newArrayList();
for (List<SelectStmt> selectStmts : selectStmtListPerParallel) {
ExportTaskExecutor executor = new ExportTaskExecutor(selectStmts, this);
jobExecutorList.add(executor);
}
}
private void generateQueryStmtOld() throws UserException {
SelectList list = new SelectList();
if (exportColumns.isEmpty()) {
list.addItem(SelectListItem.createStarItem(this.tableName));
@ -278,7 +265,16 @@ public class ExportJob implements Writable {
}
}
ArrayList<ArrayList<TableRef>> tableRefListPerQuery = splitTablets(stmt);
ArrayList<ArrayList<Long>> tabletsListPerQuery = splitTablets();
ArrayList<ArrayList<TableRef>> tableRefListPerQuery = Lists.newArrayList();
for (ArrayList<Long> tabletsList : tabletsListPerQuery) {
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null, tabletsList,
this.tableRef.getTableSample(), this.tableRef.getCommonHints());
ArrayList<TableRef> tableRefList = Lists.newArrayList();
tableRefList.add(tblRef);
tableRefListPerQuery.add(tableRefList);
}
LOG.info("Export task is split into {} outfile statements.", tableRefListPerQuery.size());
if (LOG.isDebugEnabled()) {
@ -306,30 +302,104 @@ public class ExportJob implements Writable {
}
}
private ArrayList<ArrayList<TableRef>> splitTablets(ExportStmt stmt) throws UserException {
/**
* Generate outfile select stmt
* @throws UserException
*/
private void generateQueryStmt() throws UserException {
SelectList list = new SelectList();
if (exportColumns.isEmpty()) {
list.addItem(SelectListItem.createStarItem(this.tableName));
} else {
for (Column column : exportTable.getBaseSchema()) {
String colName = column.getName().toLowerCase();
if (exportColumns.contains(colName)) {
SlotRef slotRef = new SlotRef(this.tableName, colName);
SelectListItem selectListItem = new SelectListItem(slotRef, null);
list.addItem(selectListItem);
}
}
}
ArrayList<ArrayList<TableRef>> tableRefListPerParallel = getTableRefListPerParallel();
LOG.info("Export Job [{}] is split into {} Export Task Executor.", id, tableRefListPerParallel.size());
// debug LOG output
if (LOG.isDebugEnabled()) {
for (int i = 0; i < tableRefListPerParallel.size(); i++) {
LOG.debug("ExportTaskExecutor {} is responsible for tablets:", i);
for (TableRef tableRef : tableRefListPerParallel.get(i)) {
LOG.debug("Tablet id: [{}]", tableRef.getSampleTabletIds());
}
}
}
// generate 'select..outfile..' statement
for (ArrayList<TableRef> tableRefList : tableRefListPerParallel) {
List<SelectStmt> selectStmtLists = Lists.newArrayList();
for (TableRef tableRef : tableRefList) {
ArrayList<TableRef> tmpTableRefList = Lists.newArrayList(tableRef);
FromClause fromClause = new FromClause(tmpTableRefList);
// generate outfile clause
OutFileClause outfile = new OutFileClause(this.exportPath, this.format, convertOutfileProperties());
SelectStmt selectStmt = new SelectStmt(list, fromClause, this.whereExpr, null,
null, null, LimitElement.NO_LIMIT);
selectStmt.setOutFileClause(outfile);
selectStmt.setOrigStmt(new OriginStatement(selectStmt.toSql(), 0));
selectStmtLists.add(selectStmt);
}
selectStmtListPerParallel.add(selectStmtLists);
}
// debug LOG output
if (LOG.isDebugEnabled()) {
for (int i = 0; i < selectStmtListPerParallel.size(); ++i) {
LOG.debug("ExportTaskExecutor {} is responsible for outfile:", i);
for (SelectStmt outfile : selectStmtListPerParallel.get(i)) {
LOG.debug("outfile sql: [{}]", outfile.toSql());
}
}
}
}
private ArrayList<ArrayList<TableRef>> getTableRefListPerParallel() throws UserException {
ArrayList<ArrayList<Long>> tabletsListPerParallel = splitTablets();
ArrayList<ArrayList<TableRef>> tableRefListPerParallel = Lists.newArrayList();
for (ArrayList<Long> tabletsList : tabletsListPerParallel) {
ArrayList<TableRef> tableRefList = Lists.newArrayList();
for (int i = 0; i < tabletsList.size(); i += MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT) {
int end = i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT < tabletsList.size()
? i + MAXIMUM_TABLETS_OF_OUTFILE_IN_EXPORT : tabletsList.size();
ArrayList<Long> tablets = new ArrayList<>(tabletsList.subList(i, end));
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(),
this.tableRef.getPartitionNames(), tablets,
this.tableRef.getTableSample(), this.tableRef.getCommonHints());
tableRefList.add(tblRef);
}
tableRefListPerParallel.add(tableRefList);
}
return tableRefListPerParallel;
}
private ArrayList<ArrayList<Long>> splitTablets() throws UserException {
// get tablets
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(stmt.getTblName().getDb());
OlapTable table = db.getOlapTableOrAnalysisException(stmt.getTblName().getTbl());
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(this.tableName.getDb());
OlapTable table = db.getOlapTableOrAnalysisException(this.tableName.getTbl());
List<Long> tabletIdList = Lists.newArrayList();
table.readLock();
try {
Collection<Partition> partitions = new ArrayList<Partition>();
final Collection<Partition> partitions = new ArrayList<Partition>();
// get partitions
// user specifies partitions, already checked in ExportStmt
if (this.partitionNames != null) {
if (partitionNames.size() > Config.maximum_number_of_export_partitions) {
throw new UserException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by a export job");
}
for (String partName : this.partitionNames) {
partitions.add(table.getPartition(partName));
}
this.partitionNames.forEach(partitionName -> partitions.add(table.getPartition(partitionName)));
} else {
if (table.getPartitions().size() > Config.maximum_number_of_export_partitions) {
throw new UserException("The partitions number of this export job is larger than the maximum number"
+ " of partitions allowed by a export job");
}
partitions = table.getPartitions();
partitions.addAll(table.getPartitions());
}
// get tablets
@ -344,38 +414,34 @@ public class ExportJob implements Writable {
}
Integer tabletsAllNum = tabletIdList.size();
Integer tabletsNumPerQuery = tabletsAllNum / this.parallelNum;
Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerQuery * this.parallelNum;
tabletsNum = tabletsAllNum;
Integer tabletsNumPerParallel = tabletsAllNum / this.parallelism;
Integer tabletsNumPerQueryRemainder = tabletsAllNum - tabletsNumPerParallel * this.parallelism;
Integer start = 0;
ArrayList<ArrayList<TableRef>> tableRefListPerQuery = Lists.newArrayList();
int outfileNum = this.parallelNum;
if (tabletsAllNum < this.parallelNum) {
outfileNum = tabletsAllNum;
ArrayList<ArrayList<Long>> tabletsListPerParallel = Lists.newArrayList();
Integer realParallelism = this.parallelism;
if (tabletsAllNum < this.parallelism) {
realParallelism = tabletsAllNum;
LOG.warn("Export Job [{}]: The number of tablets ({}) is smaller than parallelism ({}), "
+ "set parallelism to tablets num.", id, tabletsAllNum, this.parallelNum);
+ "set parallelism to tablets num.", id, tabletsAllNum, this.parallelism);
}
for (int i = 0; i < outfileNum; ++i) {
Integer tabletsNum = tabletsNumPerQuery;
Integer start = 0;
for (int i = 0; i < realParallelism; ++i) {
Integer tabletsNum = tabletsNumPerParallel;
if (tabletsNumPerQueryRemainder > 0) {
tabletsNum = tabletsNum + 1;
--tabletsNumPerQueryRemainder;
}
ArrayList<Long> tablets = new ArrayList<>(tabletIdList.subList(start, start + tabletsNum));
start += tabletsNum;
TableRef tblRef = new TableRef(this.tableRef.getName(), this.tableRef.getAlias(), null, tablets,
this.tableRef.getTableSample(), this.tableRef.getCommonHints());
ArrayList<TableRef> tableRefList = Lists.newArrayList();
tableRefList.add(tblRef);
tableRefListPerQuery.add(tableRefList);
tabletsListPerParallel.add(tablets);
}
return tableRefListPerQuery;
return tabletsListPerParallel;
}
private Map<String, String> convertOutfileProperties() {
Map<String, String> outfileProperties = Maps.newHashMap();
final Map<String, String> outfileProperties = Maps.newHashMap();
// file properties
if (format.equals("csv") || format.equals("csv_with_names") || format.equals("csv_with_names_and_types")) {
@ -393,9 +459,7 @@ public class ExportJob implements Writable {
// outfile clause's broker properties need 'broker.' prefix
if (brokerDesc.getStorageType() == StorageType.BROKER) {
outfileProperties.put(BROKER_PROPERTY_PREFIXES + "name", brokerDesc.getName());
for (Entry<String, String> kv : brokerDesc.getProperties().entrySet()) {
outfileProperties.put(BROKER_PROPERTY_PREFIXES + kv.getKey(), kv.getValue());
}
brokerDesc.getProperties().forEach((k, v) -> outfileProperties.put(BROKER_PROPERTY_PREFIXES + k, v));
} else {
for (Entry<String, String> kv : brokerDesc.getProperties().entrySet()) {
outfileProperties.put(kv.getKey(), kv.getValue());
@ -404,131 +468,25 @@ public class ExportJob implements Writable {
return outfileProperties;
}
public String getColumns() {
return columns;
}
public long getId() {
return id;
}
public long getDbId() {
return dbId;
}
public long getTableId() {
return this.tableId;
}
public Expr getWhereExpr() {
return whereExpr;
}
public synchronized JobState getState() {
public synchronized ExportJobState getState() {
return state;
}
public BrokerDesc getBrokerDesc() {
return brokerDesc;
private void setExportJobState(ExportJobState newState) {
this.state = newState;
}
public void setBrokerDesc(BrokerDesc brokerDesc) {
this.brokerDesc = brokerDesc;
}
public String getExportPath() {
return exportPath;
}
public String getColumnSeparator() {
return this.columnSeparator;
}
public String getLineDelimiter() {
return this.lineDelimiter;
}
public int getTimeoutSecond() {
return timeoutSecond;
}
public String getFormat() {
return format;
}
public String getMaxFileSize() {
return maxFileSize;
}
public String getDeleteExistingFiles() {
return deleteExistingFiles;
}
public String getQualifiedUser() {
return qualifiedUser;
}
public UserIdentity getUserIdentity() {
return userIdentity;
}
public List<String> getPartitions() {
return partitionNames;
}
public int getProgress() {
return progress;
}
public void setProgress(int progress) {
this.progress = progress;
}
public long getCreateTimeMs() {
return createTimeMs;
}
public long getStartTimeMs() {
return startTimeMs;
}
public void setStartTimeMs(long startTimeMs) {
this.startTimeMs = startTimeMs;
}
public long getFinishTimeMs() {
return finishTimeMs;
}
public void setFinishTimeMs(long finishTimeMs) {
this.finishTimeMs = finishTimeMs;
}
public ExportFailMsg getFailMsg() {
return failMsg;
}
public void setFailMsg(ExportFailMsg failMsg) {
this.failMsg = failMsg;
}
public String getOutfileInfo() {
return outfileInfo;
}
public void setOutfileInfo(String outfileInfo) {
this.outfileInfo = outfileInfo;
}
// TODO(ftw): delete
public synchronized Thread getDoExportingThread() {
return doExportingThread;
}
// TODO(ftw): delete
public synchronized void setDoExportingThread(Thread isExportingThread) {
this.doExportingThread = isExportingThread;
}
// TODO(ftw): delete
public synchronized void setStmtExecutor(int idx, StmtExecutor executor) {
this.stmtExecutorList.set(idx, executor);
}
@ -537,51 +495,20 @@ public class ExportJob implements Writable {
return this.stmtExecutorList.get(idx);
}
public List<TScanRangeLocations> getTabletLocations() {
return tabletLocations;
}
public List<Pair<TNetworkAddress, String>> getSnapshotPaths() {
return this.snapshotPaths;
}
public void addSnapshotPath(Pair<TNetworkAddress, String> snapshotPath) {
this.snapshotPaths.add(snapshotPath);
}
public String getSql() {
return sql;
}
public ExportExportingTask getTask() {
return task;
}
public void setTask(ExportExportingTask task) {
this.task = task;
}
public TableName getTableName() {
return tableName;
}
public SessionVariable getSessionVariables() {
return sessionVariables;
}
// TODO(ftw): delete
public synchronized void cancel(ExportFailMsg.CancelType type, String msg) {
if (msg != null) {
failMsg = new ExportFailMsg(type, msg);
}
// maybe user cancel this job
if (task != null && state == JobState.EXPORTING && stmtExecutorList != null) {
if (task != null && state == ExportJobState.EXPORTING && stmtExecutorList != null) {
for (int idx = 0; idx < stmtExecutorList.size(); ++idx) {
stmtExecutorList.get(idx).cancel();
}
}
if (updateState(ExportJob.JobState.CANCELLED, false)) {
if (updateState(ExportJobState.CANCELLED, false)) {
// release snapshot
// Status releaseSnapshotStatus = releaseSnapshotPaths();
// if (!releaseSnapshotStatus.ok()) {
@ -592,23 +519,150 @@ public class ExportJob implements Writable {
}
}
public synchronized void updateExportJobState(ExportJobState newState, Long taskId,
List<OutfileInfo> outfileInfoList, ExportFailMsg.CancelType type, String msg) throws JobException {
switch (newState) {
case PENDING:
throw new JobException("Can not update ExportJob state to 'PENDING', job id: [{}], task id: [{}]",
id, taskId);
case EXPORTING:
exportExportJob();
break;
case CANCELLED:
cancelExportTask(type, msg);
break;
case FINISHED:
finishExportTask(taskId, outfileInfoList);
break;
default:
return;
}
}
public void cancelReplayedExportJob(ExportFailMsg.CancelType type, String msg) {
setExportJobState(ExportJobState.CANCELLED);
failMsg = new ExportFailMsg(type, msg);
}
private void cancelExportTask(ExportFailMsg.CancelType type, String msg) throws JobException {
if (getState() == ExportJobState.CANCELLED) {
return;
}
if (getState() == ExportJobState.FINISHED) {
throw new JobException("Job {} has finished, can not been cancelled", id);
}
if (getState() == ExportJobState.PENDING) {
startTimeMs = System.currentTimeMillis();
}
// we need cancel all task
taskIdToExecutor.keySet().forEach(id -> {
try {
register.cancelTask(id);
} catch (JobException e) {
LOG.warn("cancel export task {} exception: {}", id, e);
}
});
cancelExportJobUnprotected(type, msg);
}
private void cancelExportJobUnprotected(ExportFailMsg.CancelType type, String msg) {
setExportJobState(ExportJobState.CANCELLED);
finishTimeMs = System.currentTimeMillis();
failMsg = new ExportFailMsg(type, msg);
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.CANCELLED);
}
// TODO(ftw): delete
public synchronized boolean finish(List<OutfileInfo> outfileInfoList) {
outfileInfo = GsonUtils.GSON.toJson(outfileInfoList);
if (updateState(ExportJob.JobState.FINISHED)) {
if (updateState(ExportJobState.FINISHED)) {
return true;
}
return false;
}
public synchronized boolean updateState(ExportJob.JobState newState) {
private void exportExportJob() {
// The first exportTaskExecutor will set state to EXPORTING,
// other exportTaskExecutors do not need to set up state.
if (getState() == ExportJobState.EXPORTING) {
return;
}
setExportJobState(ExportJobState.EXPORTING);
// if isReplay == true, startTimeMs will be read from LOG
startTimeMs = System.currentTimeMillis();
}
private void finishExportTask(Long taskId, List<OutfileInfo> outfileInfoList) throws JobException {
if (getState() == ExportJobState.CANCELLED) {
throw new JobException("Job [{}] has been cancelled, can not finish this task: {}", id, taskId);
}
allOutfileInfo.add(outfileInfoList);
++finishedTaskCount;
// calculate progress
int tmpProgress = finishedTaskCount * 100 / jobExecutorList.size();
if (finishedTaskCount * 100 / jobExecutorList.size() >= 100) {
progress = 99;
} else {
progress = tmpProgress;
}
// if all task finished
if (finishedTaskCount == jobExecutorList.size()) {
finishExportJobUnprotected();
}
}
private void finishExportJobUnprotected() {
progress = 100;
setExportJobState(ExportJobState.FINISHED);
finishTimeMs = System.currentTimeMillis();
outfileInfo = GsonUtils.GSON.toJson(allOutfileInfo);
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, ExportJobState.FINISHED);
}
public void replayExportJobState(ExportJobState newState) {
switch (newState) {
// We do not persist EXPORTING state in new version of metadata,
// but EXPORTING state may still exist in older versions of metadata.
// So if isReplay == true and newState == EXPORTING, we set newState = CANCELLED.
case EXPORTING:
// We do not need IN_QUEUE state in new version of export
// but IN_QUEUE state may still exist in older versions of metadata.
// So if isReplay == true and newState == IN_QUEUE, we set newState = CANCELLED.
case IN_QUEUE:
newState = ExportJobState.CANCELLED;
break;
case PENDING:
case CANCELLED:
progress = 0;
break;
case FINISHED:
progress = 100;
break;
default:
Preconditions.checkState(false, "wrong job state: " + newState.name());
break;
}
setExportJobState(newState);
}
// TODO(ftw): delete
public synchronized boolean updateState(ExportJobState newState) {
return this.updateState(newState, false);
}
public synchronized boolean updateState(ExportJob.JobState newState, boolean isReplay) {
// TODO(ftw): delete
public synchronized boolean updateState(ExportJobState newState, boolean isReplay) {
// We do not persist EXPORTING state in new version of metadata,
// but EXPORTING state may still exist in older versions of metadata.
// So if isReplay == true and newState == EXPORTING, we just ignore this update.
if (isFinalState() || (isReplay && newState == JobState.EXPORTING)) {
if (isFinalState() || (isReplay && newState == ExportJobState.EXPORTING)) {
return false;
}
state = newState;
@ -618,7 +672,7 @@ public class ExportJob implements Writable {
progress = 0;
break;
case EXPORTING:
// if isReplay == true, startTimeMs will be read from log
// if isReplay == true, startTimeMs will be read from LOG
if (!isReplay) {
startTimeMs = System.currentTimeMillis();
}
@ -630,7 +684,7 @@ public class ExportJob implements Writable {
progress = 100;
break;
case CANCELLED:
// if isReplay == true, finishTimeMs will be read from log
// if isReplay == true, finishTimeMs will be read from LOG
if (!isReplay) {
finishTimeMs = System.currentTimeMillis();
}
@ -640,27 +694,19 @@ public class ExportJob implements Writable {
break;
}
// we only persist Pending/Cancel/Finish state
if (!isReplay && newState != JobState.IN_QUEUE && newState != JobState.EXPORTING) {
if (!isReplay && newState != ExportJobState.IN_QUEUE && newState != ExportJobState.EXPORTING) {
Env.getCurrentEnv().getEditLog().logExportUpdateState(id, newState);
}
return true;
}
public synchronized boolean isFinalState() {
return this.state == ExportJob.JobState.CANCELLED || this.state == ExportJob.JobState.FINISHED;
return this.state == ExportJobState.CANCELLED || this.state == ExportJobState.FINISHED;
}
public boolean isExpired(long curTime) {
return (curTime - createTimeMs) / 1000 > Config.history_job_keep_max_second
&& (state == ExportJob.JobState.CANCELLED || state == ExportJob.JobState.FINISHED);
}
public String getLabel() {
return label;
}
public String getQueryId() {
return queryId;
&& (state == ExportJobState.CANCELLED || state == ExportJobState.FINISHED);
}
@Override
@ -737,7 +783,7 @@ public class ExportJob implements Writable {
}
}
state = JobState.valueOf(Text.readString(in));
state = ExportJobState.valueOf(Text.readString(in));
createTimeMs = in.readLong();
startTimeMs = in.readLong();
finishTimeMs = in.readLong();
@ -793,132 +839,4 @@ public class ExportJob implements Writable {
return false;
}
public boolean isReplayed() {
return isReplayed;
}
// for only persist op when switching job state.
public static class StateTransfer implements Writable {
@SerializedName("jobId")
long jobId;
@SerializedName("state")
JobState state;
@SerializedName("startTimeMs")
private long startTimeMs;
@SerializedName("finishTimeMs")
private long finishTimeMs;
@SerializedName("failMsg")
private ExportFailMsg failMsg;
@SerializedName("outFileInfo")
private String outFileInfo;
// used for reading from one log
public StateTransfer() {
this.jobId = -1;
this.state = JobState.CANCELLED;
this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
this.outFileInfo = "";
}
// used for persisting one log
public StateTransfer(long jobId, JobState state) {
this.jobId = jobId;
this.state = state;
ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId);
this.startTimeMs = job.getStartTimeMs();
this.finishTimeMs = job.getFinishTimeMs();
this.failMsg = job.getFailMsg();
this.outFileInfo = job.getOutfileInfo();
}
public long getJobId() {
return jobId;
}
public JobState getState() {
return state;
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static StateTransfer read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) {
StateTransfer transfer = new StateTransfer();
transfer.readFields(in);
return transfer;
}
String json = Text.readString(in);
StateTransfer transfer = GsonUtils.GSON.fromJson(json, ExportJob.StateTransfer.class);
return transfer;
}
private void readFields(DataInput in) throws IOException {
jobId = in.readLong();
state = JobState.valueOf(Text.readString(in));
}
public long getStartTimeMs() {
return startTimeMs;
}
public long getFinishTimeMs() {
return finishTimeMs;
}
public String getOutFileInfo() {
return outFileInfo;
}
public ExportFailMsg getFailMsg() {
return failMsg;
}
}
public static class OutfileInfo {
@SerializedName("fileNumber")
private String fileNumber;
@SerializedName("totalRows")
private String totalRows;
@SerializedName("fileSize")
private String fileSize;
@SerializedName("url")
private String url;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getFileNumber() {
return fileNumber;
}
public void setFileNumber(String fileNumber) {
this.fileNumber = fileNumber;
}
public String getTotalRows() {
return totalRows;
}
public void setTotalRows(String totalRows) {
this.totalRows = totalRows;
}
public String getFileSize() {
return fileSize;
}
public void setFileSize(String fileSize) {
this.fileSize = fileSize;
}
}
}

View File

@ -0,0 +1,46 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
public enum ExportJobState {
/**
* the initial state of export job.
*/
PENDING,
/**
* When the export job is waiting to be schedule.
*/
IN_QUEUE,
/**
* When the export job is exporting, the EXPORTING state will be triggered.
*/
EXPORTING,
/**
* When the export job is finished, the FINISHED state will be triggered.
*/
FINISHED,
/**
* When the export job is cancelled, the CANCELLED state will be triggered.
*/
CANCELLED,
}

View File

@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@Getter
public class ExportJobStateTransfer implements Writable {
@SerializedName("jobId")
long jobId;
@SerializedName("state")
private ExportJobState state;
@SerializedName("startTimeMs")
private long startTimeMs;
@SerializedName("finishTimeMs")
private long finishTimeMs;
@SerializedName("failMsg")
private ExportFailMsg failMsg;
@SerializedName("outFileInfo")
private String outFileInfo;
// used for reading from one log
public ExportJobStateTransfer() {
this.jobId = -1;
this.state = ExportJobState.CANCELLED;
this.failMsg = new ExportFailMsg(ExportFailMsg.CancelType.UNKNOWN, "");
this.outFileInfo = "";
}
// used for persisting one log
public ExportJobStateTransfer(long jobId, ExportJobState state) {
this.jobId = jobId;
this.state = state;
ExportJob job = Env.getCurrentEnv().getExportMgr().getJob(jobId);
this.startTimeMs = job.getStartTimeMs();
this.finishTimeMs = job.getFinishTimeMs();
this.failMsg = job.getFailMsg();
this.outFileInfo = job.getOutfileInfo();
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static ExportJobStateTransfer read(DataInput in) throws IOException {
if (Env.getCurrentEnvJournalVersion() < FeMetaVersion.VERSION_120) {
ExportJobStateTransfer transfer = new ExportJobStateTransfer();
transfer.readFields(in);
return transfer;
}
String json = Text.readString(in);
ExportJobStateTransfer transfer = GsonUtils.GSON.fromJson(json, ExportJobStateTransfer.class);
return transfer;
}
private void readFields(DataInput in) throws IOException {
jobId = in.readLong();
state = ExportJobState.valueOf(Text.readString(in));
}
}

View File

@ -35,9 +35,9 @@ import org.apache.doris.common.util.ListComparator;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.ExportJob.JobState;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.MasterTaskExecutor;
@ -69,8 +69,8 @@ public class ExportMgr extends MasterDaemon {
// lock is private and must use after db lock
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
private Map<Long, ExportJob> idToJob = Maps.newHashMap(); // exportJobId to exportJob
private Map<String, Long> labelToJobId = Maps.newHashMap();
private Map<Long, ExportJob> exportIdToJob = Maps.newHashMap(); // exportJobId to exportJob
private Map<String, Long> labelToExportJobId = Maps.newHashMap();
private MasterTaskExecutor exportingExecutor;
@ -103,7 +103,7 @@ public class ExportMgr extends MasterDaemon {
@Override
protected void runAfterCatalogReady() {
List<ExportJob> pendingJobs = getExportJobs(JobState.PENDING);
List<ExportJob> pendingJobs = getExportJobs(ExportJobState.PENDING);
List<ExportJob> newInQueueJobs = Lists.newArrayList();
for (ExportJob job : pendingJobs) {
if (handlePendingJobs(job)) {
@ -128,7 +128,7 @@ public class ExportMgr extends MasterDaemon {
private boolean handlePendingJobs(ExportJob job) {
// because maybe this job has been cancelled by user.
if (job.getState() != JobState.PENDING) {
if (job.getState() != ExportJobState.PENDING) {
return false;
}
@ -136,11 +136,12 @@ public class ExportMgr extends MasterDaemon {
// If the job is created from replay thread, all plan info will be lost.
// so the job has to be cancelled.
String failMsg = "FE restarted or Master changed during exporting. Job must be cancelled.";
job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
// job.cancel(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
job.cancelReplayedExportJob(ExportFailMsg.CancelType.RUN_FAIL, failMsg);
return false;
}
if (job.updateState(JobState.IN_QUEUE)) {
if (job.updateState(ExportJobState.IN_QUEUE)) {
LOG.info("Exchange pending status to in_queue status success. job: {}", job);
return true;
}
@ -148,7 +149,7 @@ public class ExportMgr extends MasterDaemon {
}
public List<ExportJob> getJobs() {
return Lists.newArrayList(idToJob.values());
return Lists.newArrayList(exportIdToJob.values());
}
public void addExportJob(ExportStmt stmt) throws Exception {
@ -156,7 +157,7 @@ public class ExportMgr extends MasterDaemon {
ExportJob job = createJob(jobId, stmt);
writeLock();
try {
if (labelToJobId.containsKey(job.getLabel())) {
if (labelToExportJobId.containsKey(job.getLabel())) {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
@ -167,6 +168,28 @@ public class ExportMgr extends MasterDaemon {
LOG.info("add export job. {}", job);
}
public void addExportJobAndRegisterTask(ExportStmt stmt) throws Exception {
ExportJob job = stmt.getExportJob();
long jobId = Env.getCurrentEnv().getNextId();
job.setId(jobId);
writeLock();
try {
if (labelToExportJobId.containsKey(job.getLabel())) {
throw new LabelAlreadyUsedException(job.getLabel());
}
unprotectAddJob(job);
job.getJobExecutorList().forEach(executor -> {
Long taskId = ExportJob.register.registerTask(executor);
executor.setTaskId(taskId);
job.getTaskIdToExecutor().put(taskId, executor);
});
Env.getCurrentEnv().getEditLog().logExportCreate(job);
} finally {
writeUnlock();
}
LOG.info("add export job. {}", job);
}
public void cancelExportJob(CancelExportStmt stmt) throws DdlException, AnalysisException {
// List of export jobs waiting to be cancelled
List<ExportJob> matchExportJobs = getWaitingCancelJobs(stmt);
@ -178,14 +201,20 @@ public class ExportMgr extends MasterDaemon {
if (matchExportJobs.isEmpty()) {
throw new DdlException("All export job(s) are at final state (CANCELLED/FINISHED)");
}
for (ExportJob exportJob : matchExportJobs) {
exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
try {
for (ExportJob exportJob : matchExportJobs) {
// exportJob.cancel(ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
exportJob.updateExportJobState(ExportJobState.CANCELLED, 0L, null,
ExportFailMsg.CancelType.USER_CANCEL, "user cancel");
}
} catch (JobException e) {
throw new AnalysisException(e.getMessage());
}
}
public void unprotectAddJob(ExportJob job) {
idToJob.put(job.getId(), job);
labelToJobId.putIfAbsent(job.getLabel(), job.getId());
exportIdToJob.put(job.getId(), job);
labelToExportJobId.putIfAbsent(job.getLabel(), job.getId());
}
private List<ExportJob> getWaitingCancelJobs(CancelExportStmt stmt) throws AnalysisException {
@ -224,28 +253,28 @@ public class ExportMgr extends MasterDaemon {
};
}
private ExportJob createJob(long jobId, ExportStmt stmt) throws Exception {
ExportJob job = new ExportJob(jobId);
job.setJob(stmt);
return job;
private ExportJob createJob(long jobId, ExportStmt stmt) {
ExportJob exportJob = stmt.getExportJob();
exportJob.setId(jobId);
return exportJob;
}
public ExportJob getJob(long jobId) {
ExportJob job = null;
ExportJob job;
readLock();
try {
job = idToJob.get(jobId);
job = exportIdToJob.get(jobId);
} finally {
readUnlock();
}
return job;
}
public List<ExportJob> getExportJobs(ExportJob.JobState state) {
public List<ExportJob> getExportJobs(ExportJobState state) {
List<ExportJob> result = Lists.newArrayList();
readLock();
try {
for (ExportJob job : idToJob.values()) {
for (ExportJob job : exportIdToJob.values()) {
if (job.getState() == state) {
result.add(job);
}
@ -260,7 +289,7 @@ public class ExportMgr extends MasterDaemon {
// used for `show export` statement
// NOTE: jobid and states may both specified, or only one of them, or neither
public List<List<String>> getExportJobInfosByIdOrState(
long dbId, long jobId, String label, boolean isLabelUseLike, Set<ExportJob.JobState> states,
long dbId, long jobId, String label, boolean isLabelUseLike, Set<ExportJobState> states,
ArrayList<OrderByPair> orderByPairs, long limit) throws AnalysisException {
long resultNum = limit == -1L ? Integer.MAX_VALUE : limit;
@ -273,9 +302,9 @@ public class ExportMgr extends MasterDaemon {
readLock();
try {
int counter = 0;
for (ExportJob job : idToJob.values()) {
for (ExportJob job : exportIdToJob.values()) {
long id = job.getId();
ExportJob.JobState state = job.getState();
ExportJobState state = job.getState();
String jobLabel = job.getLabel();
if (job.getDbId() != dbId) {
@ -345,7 +374,7 @@ public class ExportMgr extends MasterDaemon {
readLock();
try {
int counter = 0;
for (ExportJob job : idToJob.values()) {
for (ExportJob job : exportIdToJob.values()) {
// check auth
if (isJobShowable(job)) {
exportJobInfos.add(composeExportJobInfo(job));
@ -406,7 +435,7 @@ public class ExportMgr extends MasterDaemon {
// task infos
Map<String, Object> infoMap = Maps.newHashMap();
List<String> partitions = job.getPartitions();
List<String> partitions = job.getPartitionNames();
if (partitions == null) {
partitions = Lists.newArrayList();
partitions.add("*");
@ -422,7 +451,7 @@ public class ExportMgr extends MasterDaemon {
infoMap.put("format", job.getFormat());
infoMap.put("line_delimiter", job.getLineDelimiter());
infoMap.put("columns", job.getColumns());
infoMap.put("tablet_num", job.getTabletLocations() == null ? -1 : job.getTabletLocations().size());
infoMap.put("tablet_num", job.getTabletsNum());
infoMap.put("max_file_size", job.getMaxFileSize());
infoMap.put("delete_existing_files", job.getDeleteExistingFiles());
jobInfo.add(new Gson().toJson(infoMap));
@ -435,7 +464,7 @@ public class ExportMgr extends MasterDaemon {
jobInfo.add(job.getTimeoutSecond());
// error msg
if (job.getState() == ExportJob.JobState.CANCELLED) {
if (job.getState() == ExportJobState.CANCELLED) {
ExportFailMsg failMsg = job.getFailMsg();
jobInfo.add("type:" + failMsg.getCancelType() + "; msg:" + failMsg.getMsg());
} else {
@ -443,7 +472,7 @@ public class ExportMgr extends MasterDaemon {
}
// outfileInfo
if (job.getState() == JobState.FINISHED) {
if (job.getState() == ExportJobState.FINISHED) {
jobInfo.add(job.getOutfileInfo());
} else {
jobInfo.add(FeConstants.null_string);
@ -457,15 +486,15 @@ public class ExportMgr extends MasterDaemon {
writeLock();
try {
Iterator<Map.Entry<Long, ExportJob>> iter = idToJob.entrySet().iterator();
Iterator<Map.Entry<Long, ExportJob>> iter = exportIdToJob.entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Long, ExportJob> entry = iter.next();
ExportJob job = entry.getValue();
if ((currentTimeMs - job.getCreateTimeMs()) / 1000 > Config.history_job_keep_max_second
&& (job.getState() == ExportJob.JobState.CANCELLED
|| job.getState() == ExportJob.JobState.FINISHED)) {
&& (job.getState() == ExportJobState.CANCELLED
|| job.getState() == ExportJobState.FINISHED)) {
iter.remove();
labelToJobId.remove(job.getLabel(), job.getId());
labelToExportJobId.remove(job.getLabel(), job.getId());
}
}
} finally {
@ -482,11 +511,12 @@ public class ExportMgr extends MasterDaemon {
}
}
public void replayUpdateJobState(ExportJob.StateTransfer stateTransfer) {
public void replayUpdateJobState(ExportJobStateTransfer stateTransfer) {
readLock();
try {
ExportJob job = idToJob.get(stateTransfer.getJobId());
job.updateState(stateTransfer.getState(), true);
ExportJob job = exportIdToJob.get(stateTransfer.getJobId());
// job.updateState(stateTransfer.getState(), true);
job.replayExportJobState(stateTransfer.getState());
job.setStartTimeMs(stateTransfer.getStartTimeMs());
job.setFinishTimeMs(stateTransfer.getFinishTimeMs());
job.setFailMsg(stateTransfer.getFailMsg());
@ -496,11 +526,11 @@ public class ExportMgr extends MasterDaemon {
}
}
public long getJobNum(ExportJob.JobState state, long dbId) {
public long getJobNum(ExportJobState state, long dbId) {
int size = 0;
readLock();
try {
for (ExportJob job : idToJob.values()) {
for (ExportJob job : exportIdToJob.values()) {
if (job.getState() == state && job.getDbId() == dbId) {
++size;
}
@ -511,11 +541,11 @@ public class ExportMgr extends MasterDaemon {
return size;
}
public long getJobNum(ExportJob.JobState state) {
public long getJobNum(ExportJobState state) {
int size = 0;
readLock();
try {
for (ExportJob job : idToJob.values()) {
for (ExportJob job : exportIdToJob.values()) {
if (!Env.getCurrentEnv().getAccessManager().checkDbPriv(ConnectContext.get(),
Env.getCurrentEnv().getCatalogMgr().getDbNullable(job.getDbId()).getFullName(),
PrivPredicate.LOAD)) {

View File

@ -0,0 +1,171 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SelectStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@Slf4j
public class ExportTaskExecutor implements TransientTaskExecutor {
List<SelectStmt> selectStmtLists;
ExportJob exportJob;
@Setter
Long taskId;
private StmtExecutor stmtExecutor;
private AtomicBoolean isCanceled;
private AtomicBoolean isFinished;
ExportTaskExecutor(List<SelectStmt> selectStmtLists, ExportJob exportJob) {
this.selectStmtLists = selectStmtLists;
this.exportJob = exportJob;
this.isCanceled = new AtomicBoolean(false);
this.isFinished = new AtomicBoolean(false);
}
@Override
public void execute() throws JobException {
if (isCanceled.get()) {
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
exportJob.updateExportJobState(ExportJobState.EXPORTING, taskId, null, null, null);
List<OutfileInfo> outfileInfoList = Lists.newArrayList();
for (int idx = 0; idx < selectStmtLists.size(); ++idx) {
if (isCanceled.get()) {
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
// check the version of tablets
try {
Database db = Env.getCurrentEnv().getInternalCatalog().getDbOrAnalysisException(
exportJob.getTableName().getDb());
OlapTable table = db.getOlapTableOrAnalysisException(exportJob.getTableName().getTbl());
table.readLock();
try {
SelectStmt selectStmt = selectStmtLists.get(idx);
List<Long> tabletIds = selectStmt.getTableRefs().get(0).getSampleTabletIds();
for (Long tabletId : tabletIds) {
TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(
tabletId);
Partition partition = table.getPartition(tabletMeta.getPartitionId());
long nowVersion = partition.getVisibleVersion();
long oldVersion = exportJob.getPartitionToVersion().get(partition.getName());
if (nowVersion != oldVersion) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
CancelType.RUN_FAIL, "The version of tablet {" + tabletId + "} has changed");
throw new JobException("Export Job[{}]: Tablet {} has changed version, old version = {}, "
+ "now version = {}", exportJob.getId(), tabletId, oldVersion, nowVersion);
}
}
} finally {
table.readUnlock();
}
} catch (AnalysisException e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
}
try (AutoCloseConnectContext r = buildConnectContext()) {
stmtExecutor = new StmtExecutor(r.connectContext, selectStmtLists.get(idx));
stmtExecutor.execute();
if (r.connectContext.getState().getStateType() == MysqlStateType.ERR) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, r.connectContext.getState().getErrorMessage());
return;
}
OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo());
outfileInfoList.add(outfileInfo);
} catch (Exception e) {
exportJob.updateExportJobState(ExportJobState.CANCELLED, taskId, null,
ExportFailMsg.CancelType.RUN_FAIL, e.getMessage());
throw new JobException(e);
} finally {
stmtExecutor.addProfileToSpan();
}
}
if (isCanceled.get()) {
throw new JobException("Export executor has been canceled, task id: {}", taskId);
}
exportJob.updateExportJobState(ExportJobState.FINISHED, taskId, outfileInfoList, null, null);
isFinished.getAndSet(true);
}
@Override
public void cancel() throws JobException {
if (isFinished.get()) {
throw new JobException("Export executor has finished, task id: {}", taskId);
}
isCanceled.getAndSet(true);
if (stmtExecutor != null) {
stmtExecutor.cancel();
}
}
private AutoCloseConnectContext buildConnectContext() {
ConnectContext connectContext = new ConnectContext();
connectContext.setSessionVariable(exportJob.getSessionVariables());
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setDatabase(exportJob.getTableName().getDb());
connectContext.setQualifiedUser(exportJob.getQualifiedUser());
connectContext.setCurrentUserIdentity(exportJob.getUserIdentity());
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
connectContext.setQueryId(queryId);
connectContext.setStartTime();
connectContext.setCluster(SystemInfoService.DEFAULT_CLUSTER);
return new AutoCloseConnectContext(connectContext);
}
private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) {
OutfileInfo outfileInfo = new OutfileInfo();
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes");
outfileInfo.setUrl(resultAttachedInfo.get(OutFileClause.URL));
return outfileInfo;
}
}

View File

@ -0,0 +1,37 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load;
import com.google.gson.annotations.SerializedName;
import lombok.Data;
@Data
public class OutfileInfo {
@SerializedName("fileNumber")
private String fileNumber;
@SerializedName("totalRows")
private String totalRows;
@SerializedName("fileSize")
private String fileSize;
@SerializedName("url")
private String url;
}

View File

@ -61,6 +61,8 @@ import org.apache.doris.journal.local.LocalJournal;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.DeleteInfo;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportJobStateTransfer;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.StreamLoadRecordMgr.FetchStreamLoadRecord;
@ -365,7 +367,7 @@ public class EditLog {
break;
}
case OperationType.OP_EXPORT_UPDATE_STATE: {
ExportJob.StateTransfer op = (ExportJob.StateTransfer) journal.getData();
ExportJobStateTransfer op = (ExportJobStateTransfer) journal.getData();
ExportMgr exportMgr = env.getExportMgr();
exportMgr.replayUpdateJobState(op);
break;
@ -1461,8 +1463,8 @@ public class EditLog {
logEdit(OperationType.OP_EXPORT_CREATE, job);
}
public void logExportUpdateState(long jobId, ExportJob.JobState newState) {
ExportJob.StateTransfer transfer = new ExportJob.StateTransfer(jobId, newState);
public void logExportUpdateState(long jobId, ExportJobState newState) {
ExportJobStateTransfer transfer = new ExportJobStateTransfer(jobId, newState);
logEdit(OperationType.OP_EXPORT_UPDATE_STATE, transfer);
}

View File

@ -181,7 +181,7 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.external.iceberg.IcebergTableCreationRecord;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.Load;
import org.apache.doris.load.LoadJob;
@ -1922,8 +1922,8 @@ public class ShowExecutor {
ExportMgr exportMgr = env.getExportMgr();
Set<ExportJob.JobState> states = null;
ExportJob.JobState state = showExportStmt.getJobState();
Set<ExportJobState> states = null;
ExportJobState state = showExportStmt.getJobState();
if (state != null) {
states = Sets.newHashSet(state);
}

View File

@ -2213,7 +2213,8 @@ public class StmtExecutor {
private void handleExportStmt() throws Exception {
ExportStmt exportStmt = (ExportStmt) parsedStmt;
context.getEnv().getExportMgr().addExportJob(exportStmt);
// context.getEnv().getExportMgr().addExportJob(exportStmt);
context.getEnv().getExportMgr().addExportJobAndRegisterTask(exportStmt);
}
private void handleCtasStmt() {

View File

@ -30,8 +30,8 @@ import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.load.ExportFailMsg;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJob.JobState;
import org.apache.doris.load.ExportJob.OutfileInfo;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.OutfileInfo;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.QueryState.MysqlStateType;
@ -69,9 +69,9 @@ public class ExportExportingTask extends MasterTask {
private ExportFailMsg failMsg;
private ExportJob.OutfileInfo outfileInfo;
private OutfileInfo outfileInfo;
public ExportResult(boolean isFailed, ExportFailMsg failMsg, ExportJob.OutfileInfo outfileInfo) {
public ExportResult(boolean isFailed, ExportFailMsg failMsg, OutfileInfo outfileInfo) {
this.isFailed = isFailed;
this.failMsg = failMsg;
this.outfileInfo = outfileInfo;
@ -93,11 +93,11 @@ public class ExportExportingTask extends MasterTask {
@Override
protected void exec() {
if (job.getState() == JobState.IN_QUEUE) {
if (job.getState() == ExportJobState.IN_QUEUE) {
handleInQueueState();
}
if (job.getState() != ExportJob.JobState.EXPORTING) {
if (job.getState() != ExportJobState.EXPORTING) {
return;
}
LOG.info("begin execute export job in exporting state. job: {}", job);
@ -112,7 +112,7 @@ public class ExportExportingTask extends MasterTask {
List<SelectStmt> selectStmtList = job.getSelectStmtList();
int completeTaskNum = 0;
List<ExportJob.OutfileInfo> outfileInfoList = Lists.newArrayList();
List<OutfileInfo> outfileInfoList = Lists.newArrayList();
int parallelNum = selectStmtList.size();
CompletionService<ExportResult> completionService = new ExecutorCompletionService<>(exportExecPool);
@ -122,7 +122,7 @@ public class ExportExportingTask extends MasterTask {
final int idx = i;
completionService.submit(() -> {
// maybe user cancelled this job
if (job.getState() != JobState.EXPORTING) {
if (job.getState() != ExportJobState.EXPORTING) {
return new ExportResult(true, null, null);
}
try {
@ -162,7 +162,7 @@ public class ExportExportingTask extends MasterTask {
return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
r.connectContext.getState().getErrorMessage()), null);
}
ExportJob.OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo());
OutfileInfo outfileInfo = getOutFileInfo(r.connectContext.getResultAttachedInfo());
return new ExportResult(false, null, outfileInfo);
} catch (Exception e) {
return new ExportResult(true, new ExportFailMsg(ExportFailMsg.CancelType.RUN_FAIL,
@ -250,8 +250,8 @@ public class ExportExportingTask extends MasterTask {
return new AutoCloseConnectContext(connectContext);
}
private ExportJob.OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) {
ExportJob.OutfileInfo outfileInfo = new ExportJob.OutfileInfo();
private OutfileInfo getOutFileInfo(Map<String, String> resultAttachedInfo) {
OutfileInfo outfileInfo = new OutfileInfo();
outfileInfo.setFileNumber(resultAttachedInfo.get(OutFileClause.FILE_NUMBER));
outfileInfo.setTotalRows(resultAttachedInfo.get(OutFileClause.TOTAL_ROWS));
outfileInfo.setFileSize(resultAttachedInfo.get(OutFileClause.FILE_SIZE) + "bytes");
@ -274,7 +274,7 @@ public class ExportExportingTask extends MasterTask {
// return;
// }
if (job.updateState(ExportJob.JobState.EXPORTING)) {
if (job.updateState(ExportJobState.EXPORTING)) {
LOG.info("Exchange pending status to exporting status success. job: {}", job);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.common.ExceptionChecker;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.UserException;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.utframe.TestWithFeService;
@ -154,12 +155,12 @@ public class CancelExportStmtTest extends TestWithFeService {
List<ExportJob> exportJobList2 = Lists.newLinkedList();
ExportJob job1 = new ExportJob();
ExportJob job2 = new ExportJob();
job2.updateState(ExportJob.JobState.CANCELLED, true);
job2.updateState(ExportJobState.CANCELLED, true);
ExportJob job3 = new ExportJob();
job3.updateState(ExportJob.JobState.EXPORTING, false);
job3.updateState(ExportJobState.EXPORTING, false);
ExportJob job4 = new ExportJob();
ExportJob job5 = new ExportJob();
job5.updateState(ExportJob.JobState.IN_QUEUE, false);
job5.updateState(ExportJobState.IN_QUEUE, false);
exportJobList1.add(job1);
exportJobList1.add(job2);
exportJobList1.add(job3);
@ -188,15 +189,6 @@ public class CancelExportStmtTest extends TestWithFeService {
Assert.assertTrue(exportJobList1.stream().filter(filter).count() == 1);
stateStringLiteral = new StringLiteral("IN_QUEUE");
stateEqPredicate =
new BinaryPredicate(BinaryPredicate.Operator.EQ, stateSlotRef, stateStringLiteral);
stmt = new CancelExportStmt(null, stateEqPredicate);
stmt.analyze(analyzer);
filter = ExportMgr.buildCancelJobFilter(stmt);
Assert.assertTrue(exportJobList2.stream().filter(filter).count() == 1);
}
}

View File

@ -30,6 +30,7 @@ import org.apache.doris.common.VariableAnnotation;
import org.apache.doris.common.util.ProfileManager;
import org.apache.doris.common.util.RuntimeProfile;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.task.ExportExportingTask;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.utframe.TestWithFeService;
@ -171,14 +172,14 @@ public class SessionVariablesTest extends TestWithFeService {
ExportStmt exportStmt = (ExportStmt)
parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext);
ExportJob job = new ExportJob(1234);
job.setJob(exportStmt);
ExportJob job = exportStmt.getExportJob();
job.setId(1234);
new Expectations(job) {
{
job.getState();
minTimes = 0;
result = ExportJob.JobState.EXPORTING;
result = ExportJobState.EXPORTING;
}
};
@ -201,14 +202,14 @@ public class SessionVariablesTest extends TestWithFeService {
ExportStmt exportStmt = (ExportStmt)
parseAndAnalyzeStmt("EXPORT TABLE test_d.test_t1 TO \"file:///tmp/test_t1\"", connectContext);
ExportJob job = new ExportJob(1234);
job.setJob(exportStmt);
ExportJob job = exportStmt.getExportJob();
job.setId(1234);
new Expectations(job) {
{
job.getState();
minTimes = 0;
result = ExportJob.JobState.EXPORTING;
result = ExportJobState.EXPORTING;
}
};

View File

@ -69,7 +69,8 @@ suite("test_export_basic", "p0") {
PARTITION between_20_70 VALUES [("20"),("70")),
PARTITION more_than_70 VALUES LESS THAN ("151")
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1

View File

@ -37,7 +37,8 @@ suite("test_export_with_hdfs", "p2") {
PARTITION between_20_70 VALUES [("20"),("70")),
PARTITION more_than_70 VALUES LESS THAN ("151")
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
@ -62,8 +63,9 @@ suite("test_export_with_hdfs", "p2") {
if (res[0][2] == "FINISHED") {
def json = parseJson(res[0][11])
assert json instanceof List
assertEquals("1", json.fileNumber[0])
return json.url[0];
assertEquals("1", json.fileNumber[0][0])
log.info("outfile_path: ${json.url[0][0]}")
return json.url[0][0];
} else if (res[0][2] == "CANCELLED") {
throw new IllegalStateException("""export failed: ${res[0][10]}""")
} else {

View File

@ -38,7 +38,8 @@ suite("test_export_with_s3", "p2") {
PARTITION between_20_70 VALUES [("20"),("70")),
PARTITION more_than_70 VALUES LESS THAN ("151")
)
DISTRIBUTED BY HASH(id) PROPERTIES("replication_num" = "1");
DISTRIBUTED BY HASH(id) BUCKETS 3
PROPERTIES("replication_num" = "1");
"""
StringBuilder sb = new StringBuilder()
int i = 1
@ -63,9 +64,9 @@ suite("test_export_with_s3", "p2") {
if (res[0][2] == "FINISHED") {
def json = parseJson(res[0][11])
assert json instanceof List
assertEquals("1", json.fileNumber[0])
log.info("outfile_path: ${json.url[0]}")
return json.url[0];
assertEquals("1", json.fileNumber[0][0])
log.info("outfile_path: ${json.url[0][0]}")
return json.url[0][0];
} else if (res[0][2] == "CANCELLED") {
throw new IllegalStateException("""export failed: ${res[0][10]}""")
} else {