diff --git a/fe/pom.xml b/fe/pom.xml
index 7915e2d184..34cf88a66a 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -547,6 +547,26 @@ under the License.
0.8.13
+
+
+
+ org.apache.spark
+ spark-core_2.12
+ 2.4.5
+
+
+
+ org.apache.spark
+ spark-launcher_2.12
+ 2.4.5
+
+
+
+ org.apache.spark
+ spark-sql_2.12
+ 2.4.5
+ provided
+
diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup
index e224d156b0..dce9763ed2 100644
--- a/fe/src/main/cup/sql_parser.cup
+++ b/fe/src/main/cup/sql_parser.cup
@@ -423,8 +423,8 @@ nonterminal List data_desc_list;
nonterminal LabelName job_label;
nonterminal String opt_with_label;
nonterminal String opt_system;
-nonterminal String opt_cluster;
nonterminal BrokerDesc opt_broker;
+nonterminal ResourceDesc resource_desc;
nonterminal List opt_col_list, col_list, opt_dup_keys, opt_columns_from_path;
nonterminal List opt_col_with_comment_list, col_with_comment_list;
nonterminal ColWithComment col_with_comment;
@@ -1197,6 +1197,13 @@ load_stmt ::=
{:
RESULT = new LoadStmt(label, dataDescList, broker, system, properties);
:}
+ | KW_LOAD KW_LABEL job_label:label
+ LPAREN data_desc_list:dataDescList RPAREN
+ resource_desc:resource
+ opt_properties:properties
+ {:
+ RESULT = new LoadStmt(label, dataDescList, resource, properties);
+ :}
;
job_label ::=
@@ -1364,15 +1371,16 @@ opt_broker ::=
:}
;
-opt_cluster ::=
- {:
- RESULT = null;
- :}
- | KW_BY ident_or_text:cluster
- {:
- RESULT = cluster;
- :}
- ;
+resource_desc ::=
+ KW_WITH KW_RESOURCE ident_or_text:resourceName
+ {:
+ RESULT = new ResourceDesc(resourceName, null);
+ :}
+ | KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN
+ {:
+ RESULT = new ResourceDesc(resourceName, properties);
+ :}
+ ;
// Routine load statement
create_routine_load_stmt ::=
diff --git a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index 8b3bac1cbc..8c561b47ee 100644
--- a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -29,6 +29,13 @@ import java.io.IOException;
import java.util.Map;
// Broker descriptor
+//
+// Broker example:
+// WITH BROKER "broker0"
+// (
+// "username" = "user0",
+// "password" = "password0"
+// )
public class BrokerDesc implements Writable {
private String name;
private Map properties;
@@ -82,9 +89,9 @@ public class BrokerDesc implements Writable {
public String toSql() {
StringBuilder sb = new StringBuilder();
- sb.append(" WITH BROKER ").append(name);
+ sb.append("WITH BROKER ").append(name);
if (properties != null && !properties.isEmpty()) {
- PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false);
+ PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false, true);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();
diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
index a423d9ca72..96593b6c04 100644
--- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
+++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java
@@ -17,12 +17,16 @@
package org.apache.doris.analysis;
+import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
+import org.apache.doris.load.loadv2.SparkLoadJob;
+import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Function;
@@ -39,7 +43,9 @@ import java.util.Map.Entry;
// syntax:
// LOAD LABEL load_label
// (data_desc, ...)
+// [broker_desc]
// [BY cluster]
+// [resource_desc]
// [PROPERTIES (key1=value1, )]
//
// load_label:
@@ -53,6 +59,14 @@ import java.util.Map.Entry;
// [COLUMNS TERMINATED BY separator ]
// [(col1, ...)]
// [SET (k1=f1(xx), k2=f2(xx))]
+//
+// broker_desc:
+// WITH BROKER name
+// (key2=value2, ...)
+//
+// resource_desc:
+// WITH RESOURCE name
+// (key3=value3, ...)
public class LoadStmt extends DdlStmt {
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
@@ -80,11 +94,16 @@ public class LoadStmt extends DdlStmt {
private final List dataDescriptions;
private final BrokerDesc brokerDesc;
private final String cluster;
+ private final ResourceDesc resourceDesc;
private final Map properties;
private String user;
+ private EtlJobType etlJobType = EtlJobType.UNKNOWN;
private String version = "v2";
+ // TODO(wyb): spark-load
+ public static boolean disableSparkLoad = true;
+
// properties set
private final static ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder()
.add(TIMEOUT_PROPERTY)
@@ -96,13 +115,25 @@ public class LoadStmt extends DdlStmt {
.add(VERSION)
.add(TIMEZONE)
.build();
-
+
public LoadStmt(LabelName label, List dataDescriptions,
BrokerDesc brokerDesc, String cluster, Map properties) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = brokerDesc;
this.cluster = cluster;
+ this.resourceDesc = null;
+ this.properties = properties;
+ this.user = null;
+ }
+
+ public LoadStmt(LabelName label, List dataDescriptions,
+ ResourceDesc resourceDesc, Map properties) {
+ this.label = label;
+ this.dataDescriptions = dataDescriptions;
+ this.brokerDesc = null;
+ this.cluster = null;
+ this.resourceDesc = resourceDesc;
this.properties = properties;
this.user = null;
}
@@ -123,6 +154,10 @@ public class LoadStmt extends DdlStmt {
return cluster;
}
+ public ResourceDesc getResourceDesc() {
+ return resourceDesc;
+ }
+
public Map getProperties() {
return properties;
}
@@ -131,12 +166,21 @@ public class LoadStmt extends DdlStmt {
return user;
}
+ public EtlJobType getEtlJobType() {
+ return etlJobType;
+ }
+
public static void checkProperties(Map properties) throws DdlException {
if (properties == null) {
return;
}
for (Entry entry : properties.entrySet()) {
+ // temporary use for global dict
+ if (entry.getKey().startsWith(SparkLoadJob.BITMAP_DATA_PROPERTY)) {
+ continue;
+ }
+
if (!PROPERTIES_SET.contains(entry.getKey())) {
throw new DdlException(entry.getKey() + " is invalid property");
}
@@ -224,11 +268,34 @@ public class LoadStmt extends DdlStmt {
throw new AnalysisException("No data file in load statement.");
}
for (DataDescription dataDescription : dataDescriptions) {
- if (brokerDesc == null) {
+ if (brokerDesc == null && resourceDesc == null) {
dataDescription.setIsHadoopLoad(true);
}
dataDescription.analyze(label.getDbName());
}
+
+ if (resourceDesc != null) {
+ resourceDesc.analyze();
+ etlJobType = resourceDesc.getEtlJobType();
+ // TODO(wyb): spark-load
+ if (disableSparkLoad) {
+ throw new AnalysisException("Spark Load is comming soon");
+ }
+ // check resource usage privilege
+ if (!Catalog.getCurrentCatalog().getAuth().checkResourcePriv(ConnectContext.get(),
+ resourceDesc.getName(),
+ PrivPredicate.USAGE)) {
+ throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
+ + "'@'" + ConnectContext.get().getRemoteIP()
+ + "' for resource '" + resourceDesc.getName() + "'");
+ }
+ } else if (brokerDesc != null) {
+ etlJobType = EtlJobType.BROKER;
+ } else {
+ // if cluster is null, use default hadoop cluster
+ // if cluster is not null, use this hadoop cluster
+ etlJobType = EtlJobType.HADOOP;
+ }
try {
checkProperties(properties);
@@ -242,7 +309,7 @@ public class LoadStmt extends DdlStmt {
@Override
public boolean needAuditEncryption() {
- if (brokerDesc != null) {
+ if (brokerDesc != null || resourceDesc != null) {
return true;
}
return false;
@@ -263,16 +330,16 @@ public class LoadStmt extends DdlStmt {
return dataDescription.toSql();
}
})).append(")");
+ if (brokerDesc != null) {
+ sb.append("\n").append(brokerDesc.toSql());
+ }
if (cluster != null) {
sb.append("\nBY '");
sb.append(cluster);
sb.append("'");
}
-
- if (brokerDesc != null) {
- sb.append("\n WITH BROKER '").append(brokerDesc.getName()).append("' (");
- sb.append(new PrintableMap(brokerDesc.getProperties(), "=", true, false, true));
- sb.append(")");
+ if (resourceDesc != null) {
+ sb.append("\n").append(resourceDesc.toSql());
}
if (properties != null && !properties.isEmpty()) {
diff --git a/fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java b/fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java
new file mode 100644
index 0000000000..5d12b02349
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Resource;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.util.PrintableMap;
+import org.apache.doris.load.EtlJobType;
+
+import com.google.common.collect.Maps;
+
+import java.util.Map;
+
+// Resource descriptor
+//
+// Spark example:
+// WITH RESOURCE "spark0"
+// (
+// "spark.jars" = "xxx.jar,yyy.jar",
+// "spark.files" = "/tmp/aaa,/tmp/bbb",
+// "spark.executor.memory" = "1g",
+// "spark.yarn.queue" = "queue0"
+// )
+public class ResourceDesc {
+ protected String name;
+ protected Map properties;
+ protected EtlJobType etlJobType;
+
+ // Only used for recovery
+ private ResourceDesc() {
+ }
+
+ public ResourceDesc(String name, Map properties) {
+ this.name = name;
+ this.properties = properties;
+ if (this.properties == null) {
+ this.properties = Maps.newHashMap();
+ }
+ this.etlJobType = EtlJobType.UNKNOWN;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public Map getProperties() {
+ return properties;
+ }
+
+ public EtlJobType getEtlJobType() {
+ return etlJobType;
+ }
+
+ public void analyze() throws AnalysisException {
+ // check resource exist or not
+ Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(getName());
+ if (resource == null) {
+ throw new AnalysisException("Resource does not exist. name: " + getName());
+ }
+ if (resource.getType() == Resource.ResourceType.SPARK) {
+ etlJobType = EtlJobType.SPARK;
+ }
+ }
+
+ public String toSql() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("WITH RESOURCE '").append(name).append("'");
+ if (properties != null && !properties.isEmpty()) {
+ PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false, true);
+ sb.append(" (").append(printableMap.toString()).append(")");
+ }
+ return sb.toString();
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/src/main/java/org/apache/doris/catalog/SparkResource.java
index 3d57914b2e..e89c1c996d 100644
--- a/fe/src/main/java/org/apache/doris/catalog/SparkResource.java
+++ b/fe/src/main/java/org/apache/doris/catalog/SparkResource.java
@@ -17,7 +17,7 @@
package org.apache.doris.catalog;
-//import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
@@ -148,7 +148,6 @@ public class SparkResource extends Resource {
return getMaster().equalsIgnoreCase(YARN_MASTER);
}
- /*
public void update(ResourceDesc resourceDesc) throws DdlException {
Preconditions.checkState(name.equals(resourceDesc.getName()));
@@ -172,7 +171,6 @@ public class SparkResource extends Resource {
}
brokerProperties.putAll(getBrokerProperties(properties));
}
- */
@Override
protected void setProperties(Map properties) throws DdlException {
diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java
index c29921f205..8424e96714 100644
--- a/fe/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/src/main/java/org/apache/doris/common/Config.java
@@ -491,6 +491,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int hadoop_load_default_timeout_second = 86400 * 3; // 3 day
+ /*
+ * Default spark load timeout
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static int spark_load_default_timeout_second = 86400; // 1 day
+
/*
* Default number of waiting jobs for routine load and version 2 of load
* This is a desired number.
diff --git a/fe/src/main/java/org/apache/doris/load/EtlJobType.java b/fe/src/main/java/org/apache/doris/load/EtlJobType.java
index afc9c75bee..97f5aebf34 100644
--- a/fe/src/main/java/org/apache/doris/load/EtlJobType.java
+++ b/fe/src/main/java/org/apache/doris/load/EtlJobType.java
@@ -22,5 +22,7 @@ public enum EtlJobType {
MINI,
INSERT,
BROKER,
- DELETE
+ DELETE,
+ SPARK,
+ UNKNOWN
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
index 8d817f0f84..a42f4e7a66 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java
@@ -17,63 +17,39 @@
package org.apache.doris.load.loadv2;
-
import org.apache.doris.analysis.BrokerDesc;
-import org.apache.doris.analysis.DataDescription;
-import org.apache.doris.analysis.LoadStmt;
-import org.apache.doris.analysis.SqlParser;
-import org.apache.doris.analysis.SqlScanner;
-import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
-import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
-import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
-import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.BrokerFileGroup;
-import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.metric.MetricRepo;
-import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
-import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
-import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import com.google.common.base.Joiner;
-import com.google.common.base.Strings;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.io.StringReader;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
/**
@@ -82,129 +58,22 @@ import java.util.UUID;
* Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished.
* Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished.
*/
-public class BrokerLoadJob extends LoadJob {
+public class BrokerLoadJob extends BulkLoadJob {
private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
- // input params
- private BrokerDesc brokerDesc;
- // this param is used to persist the expr of columns
- // the origin stmt is persisted instead of columns expr
- // the expr of columns will be reanalyze when the log is replayed
- private OriginStatement originStmt;
-
- // include broker desc and data desc
- private BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
- private List commitInfos = Lists.newArrayList();
-
- // sessionVariable's name -> sessionVariable's value
- // we persist these sessionVariables due to the session is not available when replaying the job.
- private Map sessionVariables = Maps.newHashMap();
-
// only for log replay
public BrokerLoadJob() {
super();
this.jobType = EtlJobType.BROKER;
}
- private BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
+ public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
throws MetaNotFoundException {
- super(dbId, label);
+ super(dbId, label, originStmt);
this.timeoutSecond = Config.broker_load_default_timeout_second;
this.brokerDesc = brokerDesc;
- this.originStmt = originStmt;
this.jobType = EtlJobType.BROKER;
- this.authorizationInfo = gatherAuthInfo();
-
- if (ConnectContext.get() != null) {
- SessionVariable var = ConnectContext.get().getSessionVariable();
- sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
- } else {
- sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
- }
- }
-
- public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
- // get db id
- String dbName = stmt.getLabel().getDbName();
- Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName());
- if (db == null) {
- throw new DdlException("Database[" + dbName + "] does not exist");
- }
-
- // create job
- try {
- BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
- stmt.getBrokerDesc(), stmt.getOrigStmt());
- brokerLoadJob.setJobProperties(stmt.getProperties());
- brokerLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
- return brokerLoadJob;
- } catch (MetaNotFoundException e) {
- throw new DdlException(e.getMessage());
- }
- }
-
- private void checkAndSetDataSourceInfo(Database db, List dataDescriptions) throws DdlException {
- // check data source info
- db.readLock();
- try {
- for (DataDescription dataDescription : dataDescriptions) {
- BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
- fileGroup.parse(db, dataDescription);
- fileGroupAggInfo.addFileGroup(fileGroup);
- }
- } finally {
- db.readUnlock();
- }
- }
-
- private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
- Database database = Catalog.getCurrentCatalog().getDb(dbId);
- if (database == null) {
- throw new MetaNotFoundException("Database " + dbId + "has been deleted");
- }
- return new AuthorizationInfo(database.getFullName(), getTableNames());
- }
-
- @Override
- public Set getTableNamesForShow() {
- Set result = Sets.newHashSet();
- Database database = Catalog.getCurrentCatalog().getDb(dbId);
- if (database == null) {
- for (long tableId : fileGroupAggInfo.getAllTableIds()) {
- result.add(String.valueOf(tableId));
- }
- return result;
- }
- for (long tableId : fileGroupAggInfo.getAllTableIds()) {
- Table table = database.getTable(tableId);
- if (table == null) {
- result.add(String.valueOf(tableId));
- } else {
- result.add(table.getName());
- }
- }
- return result;
- }
-
- @Override
- public Set getTableNames() throws MetaNotFoundException{
- Set result = Sets.newHashSet();
- Database database = Catalog.getCurrentCatalog().getDb(dbId);
- if (database == null) {
- throw new MetaNotFoundException("Database " + dbId + "has been deleted");
- }
- // The database will not be locked in here.
- // The getTable is a thread-safe method called without read lock of database
- for (long tableId : fileGroupAggInfo.getAllTableIds()) {
- Table table = database.getTable(tableId);
- if (table == null) {
- throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId);
- } else {
- result.add(table.getName());
- }
- }
- return result;
}
@Override
@@ -242,76 +111,6 @@ public class BrokerLoadJob extends LoadJob {
}
}
- @Override
- public void onTaskFailed(long taskId, FailMsg failMsg) {
- writeLock();
- try {
- // check if job has been completed
- if (isTxnDone()) {
- LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("state", state)
- .add("error_msg", "this task will be ignored when job is: " + state)
- .build());
- return;
- }
- LoadTask loadTask = idToTasks.get(taskId);
- if (loadTask == null) {
- return;
- }
- if (loadTask.getRetryTime() <= 0) {
- unprotectedExecuteCancel(failMsg, true);
- logFinalOperation();
- return;
- } else {
- // retry task
- idToTasks.remove(loadTask.getSignature());
- if (loadTask instanceof LoadLoadingTask) {
- loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId());
- }
- loadTask.updateRetryInfo();
- idToTasks.put(loadTask.getSignature(), loadTask);
- // load id will be added to loadStatistic when executing this task
- Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
- return;
- }
- } finally {
- writeUnlock();
- }
- }
-
- /**
- * If the db or table could not be found, the Broker load job will be cancelled.
- */
- @Override
- public void analyze() {
- if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) {
- return;
- }
- // Reset dataSourceInfo, it will be re-created in analyze
- fileGroupAggInfo = new BrokerFileGroupAggInfo();
- SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
- Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
- LoadStmt stmt = null;
- try {
- stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
- for (DataDescription dataDescription : stmt.getDataDescriptions()) {
- dataDescription.analyzeWithoutCheckPriv();
- }
- Database db = Catalog.getCurrentCatalog().getDb(dbId);
- if (db == null) {
- throw new DdlException("Database[" + dbId + "] does not exist");
- }
- checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
- } catch (Exception e) {
- LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
- .add("origin_stmt", originStmt)
- .add("msg", "The failure happens in analyze, the load job will be cancelled with error:"
- + e.getMessage())
- .build(), e);
- cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
- }
- }
-
/**
* step1: divide job into loading task
* step2: init the plan of task
@@ -514,63 +313,4 @@ public class BrokerLoadJob extends LoadJob {
}
return String.valueOf(value);
}
-
- @Override
- protected void replayTxnAttachment(TransactionState txnState) {
- if (txnState.getTxnCommitAttachment() == null) {
- // The txn attachment maybe null when broker load has been cancelled without attachment.
- // The end log of broker load has been record but the callback id of txnState hasn't been removed
- // So the callback of txn is executed when log of txn aborted is replayed.
- return;
- }
- unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- brokerDesc.write(out);
- originStmt.write(out);
-
- out.writeInt(sessionVariables.size());
- for (Map.Entry entry : sessionVariables.entrySet()) {
- Text.writeString(out, entry.getKey());
- Text.writeString(out, entry.getValue());
- }
- }
-
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- brokerDesc = BrokerDesc.read(in);
- if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_61) {
- fileGroupAggInfo.readFields(in);
- }
-
- if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
- if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
- String stmt = Text.readString(in);
- originStmt = new OriginStatement(stmt, 0);
- } else {
- originStmt = OriginStatement.read(in);
- }
- } else {
- originStmt = new OriginStatement("", 0);
- }
- // The origin stmt does not be analyzed in here.
- // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
- // The origin stmt will be analyzed after the replay is completed.
-
- if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) {
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- String key = Text.readString(in);
- String value = Text.readString(in);
- sessionVariables.put(key, value);
- }
- } else {
- // old version of load does not have sqlmode, set it to default
- sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
- }
- }
-
}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
new file mode 100644
index 0000000000..4d658fec83
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.DataDescription;
+import org.apache.doris.analysis.LoadStmt;
+import org.apache.doris.analysis.SqlParser;
+import org.apache.doris.analysis.SqlScanner;
+import org.apache.doris.catalog.AuthorizationInfo;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Database;
+import org.apache.doris.catalog.Table;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.FeMetaVersion;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.LogBuilder;
+import org.apache.doris.common.util.LogKey;
+import org.apache.doris.common.util.SqlParserUtils;
+import org.apache.doris.load.BrokerFileGroup;
+import org.apache.doris.load.BrokerFileGroupAggInfo;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.qe.SessionVariable;
+import org.apache.doris.qe.SqlModeHelper;
+import org.apache.doris.transaction.TabletCommitInfo;
+import org.apache.doris.transaction.TransactionState;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * parent class of BrokerLoadJob and SparkLoadJob from load stmt
+ */
+public abstract class BulkLoadJob extends LoadJob {
+ private static final Logger LOG = LogManager.getLogger(BulkLoadJob.class);
+
+ // input params
+ protected BrokerDesc brokerDesc;
+ // this param is used to persist the expr of columns
+ // the origin stmt is persisted instead of columns expr
+ // the expr of columns will be reanalyze when the log is replayed
+ private OriginStatement originStmt;
+
+ // include broker desc and data desc
+ protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
+ protected List commitInfos = Lists.newArrayList();
+
+ // sessionVariable's name -> sessionVariable's value
+ // we persist these sessionVariables due to the session is not available when replaying the job.
+ private Map sessionVariables = Maps.newHashMap();
+
+ // only for log replay
+ public BulkLoadJob() {
+ super();
+ }
+
+ public BulkLoadJob(long dbId, String label, OriginStatement originStmt) throws MetaNotFoundException {
+ super(dbId, label);
+ this.originStmt = originStmt;
+ this.authorizationInfo = gatherAuthInfo();
+
+ if (ConnectContext.get() != null) {
+ SessionVariable var = ConnectContext.get().getSessionVariable();
+ sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
+ } else {
+ sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
+ }
+ }
+
+ public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
+ // get db id
+ String dbName = stmt.getLabel().getDbName();
+ Database db = Catalog.getCurrentCatalog().getDb(dbName);
+ if (db == null) {
+ throw new DdlException("Database[" + dbName + "] does not exist");
+ }
+
+ // create job
+ BulkLoadJob bulkLoadJob = null;
+ try {
+ switch (stmt.getEtlJobType()) {
+ case BROKER:
+ bulkLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
+ stmt.getBrokerDesc(), stmt.getOrigStmt());
+ break;
+ case SPARK:
+ bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(),
+ stmt.getResourceDesc(), stmt.getOrigStmt());
+ break;
+ case MINI:
+ case DELETE:
+ case HADOOP:
+ case INSERT:
+ throw new DdlException("LoadManager only support create broker and spark load job from stmt.");
+ default:
+ throw new DdlException("Unknown load job type.");
+ }
+ bulkLoadJob.setJobProperties(stmt.getProperties());
+ bulkLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
+ return bulkLoadJob;
+ } catch (MetaNotFoundException e) {
+ throw new DdlException(e.getMessage());
+ }
+ }
+
+ private void checkAndSetDataSourceInfo(Database db, List dataDescriptions) throws DdlException {
+ // check data source info
+ db.readLock();
+ try {
+ for (DataDescription dataDescription : dataDescriptions) {
+ BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
+ fileGroup.parse(db, dataDescription);
+ fileGroupAggInfo.addFileGroup(fileGroup);
+ }
+ } finally {
+ db.readUnlock();
+ }
+ }
+
+ private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
+ Database database = Catalog.getCurrentCatalog().getDb(dbId);
+ if (database == null) {
+ throw new MetaNotFoundException("Database " + dbId + "has been deleted");
+ }
+ return new AuthorizationInfo(database.getFullName(), getTableNames());
+ }
+
+ @Override
+ public Set getTableNamesForShow() {
+ Set result = Sets.newHashSet();
+ Database database = Catalog.getCurrentCatalog().getDb(dbId);
+ if (database == null) {
+ for (long tableId : fileGroupAggInfo.getAllTableIds()) {
+ result.add(String.valueOf(tableId));
+ }
+ return result;
+ }
+ for (long tableId : fileGroupAggInfo.getAllTableIds()) {
+ Table table = database.getTable(tableId);
+ if (table == null) {
+ result.add(String.valueOf(tableId));
+ } else {
+ result.add(table.getName());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public Set getTableNames() throws MetaNotFoundException{
+ Set result = Sets.newHashSet();
+ Database database = Catalog.getCurrentCatalog().getDb(dbId);
+ if (database == null) {
+ throw new MetaNotFoundException("Database " + dbId + "has been deleted");
+ }
+ // The database will not be locked in here.
+ // The getTable is a thread-safe method called without read lock of database
+ for (long tableId : fileGroupAggInfo.getAllTableIds()) {
+ Table table = database.getTable(tableId);
+ if (table == null) {
+ throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId);
+ } else {
+ result.add(table.getName());
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public void onTaskFailed(long taskId, FailMsg failMsg) {
+ writeLock();
+ try {
+ // check if job has been completed
+ if (isTxnDone()) {
+ LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("state", state)
+ .add("error_msg", "this task will be ignored when job is: " + state)
+ .build());
+ return;
+ }
+ LoadTask loadTask = idToTasks.get(taskId);
+ if (loadTask == null) {
+ return;
+ }
+ if (loadTask.getRetryTime() <= 0) {
+ unprotectedExecuteCancel(failMsg, true);
+ logFinalOperation();
+ return;
+ } else {
+ // retry task
+ idToTasks.remove(loadTask.getSignature());
+ if (loadTask instanceof LoadLoadingTask) {
+ loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId());
+ }
+ loadTask.updateRetryInfo();
+ idToTasks.put(loadTask.getSignature(), loadTask);
+ // load id will be added to loadStatistic when executing this task
+ Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
+ return;
+ }
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ /**
+ * If the db or table could not be found, the Broker load job will be cancelled.
+ */
+ @Override
+ public void analyze() {
+ if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) {
+ return;
+ }
+ // Reset dataSourceInfo, it will be re-created in analyze
+ fileGroupAggInfo = new BrokerFileGroupAggInfo();
+ SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
+ Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
+ LoadStmt stmt = null;
+ try {
+ stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
+ for (DataDescription dataDescription : stmt.getDataDescriptions()) {
+ dataDescription.analyzeWithoutCheckPriv();
+ }
+ Database db = Catalog.getCurrentCatalog().getDb(dbId);
+ if (db == null) {
+ throw new DdlException("Database[" + dbId + "] does not exist");
+ }
+ checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
+ } catch (Exception e) {
+ LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
+ .add("origin_stmt", originStmt)
+ .add("msg", "The failure happens in analyze, the load job will be cancelled with error:"
+ + e.getMessage())
+ .build(), e);
+ cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
+ }
+ }
+
+ @Override
+ protected void replayTxnAttachment(TransactionState txnState) {
+ if (txnState.getTxnCommitAttachment() == null) {
+ // The txn attachment maybe null when broker load has been cancelled without attachment.
+ // The end log of broker load has been record but the callback id of txnState hasn't been removed
+ // So the callback of txn is executed when log of txn aborted is replayed.
+ return;
+ }
+ unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ brokerDesc.write(out);
+ originStmt.write(out);
+
+ out.writeInt(sessionVariables.size());
+ for (Map.Entry entry : sessionVariables.entrySet()) {
+ Text.writeString(out, entry.getKey());
+ Text.writeString(out, entry.getValue());
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ brokerDesc = BrokerDesc.read(in);
+ if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_61) {
+ fileGroupAggInfo.readFields(in);
+ }
+
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
+ if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
+ String stmt = Text.readString(in);
+ originStmt = new OriginStatement(stmt, 0);
+ } else {
+ originStmt = OriginStatement.read(in);
+ }
+ } else {
+ originStmt = new OriginStatement("", 0);
+ }
+ // The origin stmt does not be analyzed in here.
+ // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
+ // The origin stmt will be analyzed after the replay is completed.
+
+ if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) {
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ String key = Text.readString(in);
+ String value = Text.readString(in);
+ sessionVariables.put(key, value);
+ }
+ } else {
+ // old version of load does not have sqlmode, set it to default
+ sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
+ }
+ }
+
+}
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
index 7f1d78821a..f25e05bb0e 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java
@@ -21,6 +21,7 @@ package org.apache.doris.load.loadv2;
public enum JobState {
UNKNOWN, // this is only for ISSUE #2354
PENDING, // init state
+ ETL, // load data partition, sort and aggregation with etl cluster
LOADING, // job is running
COMMITTED, // transaction is committed but not visible
FINISHED, // transaction is visible and job is finished
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
index e0cb91a716..bdeb0c2501 100644
--- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL;
import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL;
import org.apache.doris.analysis.CancelLoadStmt;
@@ -104,14 +105,14 @@ public class LoadManager implements Writable{
writeLock();
try {
checkLabelUsed(dbId, stmt.getLabel().getLabelName());
- if (stmt.getBrokerDesc() == null) {
- throw new DdlException("LoadManager only support the broker load.");
+ if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) {
+ throw new DdlException("LoadManager only support the broker and spark load.");
}
if (loadJobScheduler.isQueueFull()) {
throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
+ "please retry later.");
}
- loadJob = BrokerLoadJob.fromLoadStmt(stmt);
+ loadJob = BulkLoadJob.fromLoadStmt(stmt);
createLoadJob(loadJob);
} finally {
writeUnlock();
@@ -529,7 +530,6 @@ public class LoadManager implements Writable{
*
* @param dbId
* @param label
- * @param requestId: the uuid of each txn request from BE
* @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
*/
private void checkLabelUsed(long dbId, String label)
diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
new file mode 100644
index 0000000000..387452b976
--- /dev/null
+++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.load.loadv2;
+
+import com.google.common.base.Strings;
+import org.apache.doris.analysis.BrokerDesc;
+import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.Resource;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
+import org.apache.doris.common.io.Text;
+import org.apache.doris.load.EtlJobType;
+import org.apache.doris.load.FailMsg;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.task.AgentTaskQueue;
+import org.apache.doris.task.PushTask;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.apache.spark.launcher.SparkAppHandle;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * There are 4 steps in SparkLoadJob:
+ * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job.
+ * Step2: LoadEtlChecker will check spark etl job status periodly and send push tasks to be when spark etl job is finished.
+ * Step3: LoadLoadingChecker will check loading status periodly and commit transaction when push tasks are finished.
+ * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
+ */
+public class SparkLoadJob extends BulkLoadJob {
+ private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
+
+ // for global dict
+ public static final String BITMAP_DATA_PROPERTY = "bitmap_data";
+
+ // --- members below need persist ---
+ // create from resourceDesc when job created
+ private SparkResource sparkResource;
+ // members below updated when job state changed to etl
+ private long etlStartTimestamp = -1;
+ // for spark yarn
+ private String appId = "";
+ // spark job outputPath
+ private String etlOutputPath = "";
+ // members below updated when job state changed to loading
+ // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
+ private Map> tabletMetaToFileInfo = Maps.newHashMap();
+
+ // --- members below not persist ---
+ // temporary use
+ // one SparkLoadJob has only one table to load
+ // hivedb.table for global dict
+ private String hiveTableName = "";
+ private ResourceDesc resourceDesc;
+ // for spark standalone
+ private SparkAppHandle sparkAppHandle;
+ // for straggler wait long time to commit transaction
+ private long quorumFinishTimestamp = -1;
+ // below for push task
+ private Map> tableToLoadPartitions = Maps.newHashMap();
+ //private Map indexToPushBrokerReaderParams = Maps.newHashMap();
+ private Map indexToSchemaHash = Maps.newHashMap();
+ private Map> tabletToSentReplicaPushTask = Maps.newHashMap();
+ private Set finishedReplicas = Sets.newHashSet();
+ private Set quorumTablets = Sets.newHashSet();
+ private Set fullTablets = Sets.newHashSet();
+
+ // only for log replay
+ public SparkLoadJob() {
+ super();
+ jobType = EtlJobType.SPARK;
+ }
+
+ public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt)
+ throws MetaNotFoundException {
+ super(dbId, label, originStmt);
+ this.resourceDesc = resourceDesc;
+ timeoutSecond = Config.spark_load_default_timeout_second;
+ jobType = EtlJobType.SPARK;
+ }
+
+ public String getHiveTableName() {
+ return hiveTableName;
+ }
+
+ @Override
+ protected void setJobProperties(Map properties) throws DdlException {
+ super.setJobProperties(properties);
+
+ // set spark resource and broker desc
+ setResourceInfo();
+
+ // global dict
+ if (properties != null) {
+ if (properties.containsKey(BITMAP_DATA_PROPERTY)) {
+ hiveTableName = properties.get(BITMAP_DATA_PROPERTY);
+ }
+ }
+ }
+
+ /**
+ * merge system conf with load stmt
+ * @throws DdlException
+ */
+ private void setResourceInfo() throws DdlException {
+ // spark resource
+ String resourceName = resourceDesc.getName();
+ Resource oriResource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
+ if (oriResource == null) {
+ throw new DdlException("Resource does not exist. name: " + resourceName);
+ }
+ sparkResource = ((SparkResource) oriResource).getCopiedResource();
+ sparkResource.update(resourceDesc);
+
+ // broker desc
+ Map brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix();
+ brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties);
+ }
+
+ /**
+ * load job already cancelled or finished, clear job below:
+ * 1. kill etl job and delete etl files
+ * 2. clear push tasks and infos that not persist
+ */
+ private void clearJob() {
+ Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED);
+
+ LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state);
+ // TODO(wyb): spark-load
+ //SparkEtlJobHandler handler = new SparkEtlJobHandler();
+ if (state == JobState.CANCELLED) {
+ if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) {
+ try {
+ // TODO(wyb): spark-load
+ //handler.killEtlJob(sparkAppHandle, appId, id, sparkResource);
+ } catch (Exception e) {
+ LOG.warn("kill etl job failed. id: {}, state: {}", id, state, e);
+ }
+ }
+ }
+ if (!Strings.isNullOrEmpty(etlOutputPath)) {
+ try {
+ // delete label dir, remove the last taskId dir
+ String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/"));
+ // TODO(wyb): spark-load
+ //handler.deleteEtlOutputPath(outputPath, brokerDesc);
+ } catch (Exception e) {
+ LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e);
+ }
+ }
+
+ LOG.debug("clear push tasks and infos that not persist. id: {}, state: {}", id, state);
+ writeLock();
+ try {
+ // clear push task first
+ for (Map sentReplicaPushTask : tabletToSentReplicaPushTask.values()) {
+ for (PushTask pushTask : sentReplicaPushTask.values()) {
+ if (pushTask == null) {
+ continue;
+ }
+ AgentTaskQueue.removeTask(pushTask.getBackendId(), pushTask.getTaskType(), pushTask.getSignature());
+ }
+ }
+ // clear job infos that not persist
+ hiveTableName = "";
+ sparkAppHandle = null;
+ resourceDesc = null;
+ tableToLoadPartitions.clear();
+ //indexToPushBrokerReaderParams.clear();
+ indexToSchemaHash.clear();
+ tabletToSentReplicaPushTask.clear();
+ finishedReplicas.clear();
+ quorumTablets.clear();
+ fullTablets.clear();
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ @Override
+ public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
+ super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
+ clearJob();
+ }
+
+ @Override
+ public void cancelJob(FailMsg failMsg) throws DdlException {
+ super.cancelJob(failMsg);
+ clearJob();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ super.write(out);
+ sparkResource.write(out);
+ out.writeLong(etlStartTimestamp);
+ Text.writeString(out, appId);
+ Text.writeString(out, etlOutputPath);
+ out.writeInt(tabletMetaToFileInfo.size());
+ for (Map.Entry> entry : tabletMetaToFileInfo.entrySet()) {
+ Text.writeString(out, entry.getKey());
+ Text.writeString(out, entry.getValue().first);
+ out.writeLong(entry.getValue().second);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ super.readFields(in);
+ sparkResource = (SparkResource) Resource.read(in);
+ etlStartTimestamp = in.readLong();
+ appId = Text.readString(in);
+ etlOutputPath = Text.readString(in);
+ int size = in.readInt();
+ for (int i = 0; i < size; i++) {
+ String tabletMetaStr = Text.readString(in);
+ Pair fileInfo = Pair.create(Text.readString(in), in.readLong());
+ tabletMetaToFileInfo.put(tabletMetaStr, fileInfo);
+ }
+ }
+}
diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
index db1a0e5e59..824e9f6a7a 100644
--- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
+++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java
@@ -118,15 +118,13 @@ public class DdlExecutor {
catalog.cancelAlter((CancelAlterTableStmt) ddlStmt);
} else if (ddlStmt instanceof LoadStmt) {
LoadStmt loadStmt = (LoadStmt) ddlStmt;
- EtlJobType jobType;
- if (loadStmt.getBrokerDesc() != null) {
- jobType = EtlJobType.BROKER;
- } else {
- if (Config.disable_hadoop_load) {
- throw new DdlException("Load job by hadoop cluster is disabled."
- + " Try using broker load. See 'help broker load;'");
- }
- jobType = EtlJobType.HADOOP;
+ EtlJobType jobType = loadStmt.getEtlJobType();
+ if (jobType == EtlJobType.UNKNOWN) {
+ throw new DdlException("Unknown load job type");
+ }
+ if (jobType == EtlJobType.HADOOP && Config.disable_hadoop_load) {
+ throw new DdlException("Load job by hadoop cluster is disabled."
+ + " Try using broker load. See 'help broker load;'");
}
if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) {
catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis());
diff --git a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
index 48dd23b695..993fda1573 100644
--- a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
+++ b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java
@@ -17,9 +17,14 @@
package org.apache.doris.analysis;
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ResourceMgr;
+import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
+import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.privilege.PaloAuth;
+import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
@@ -68,15 +73,26 @@ public class LoadStmtTest {
}
@Test
- public void testNormal(@Injectable DataDescription desc) throws UserException, AnalysisException {
+ public void testNormal(@Injectable DataDescription desc, @Mocked Catalog catalog,
+ @Injectable ResourceMgr resourceMgr, @Injectable PaloAuth auth) throws UserException, AnalysisException {
List dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(desc);
+ String resourceName = "spark0";
+ SparkResource resource = new SparkResource(resourceName);
new Expectations(){
{
desc.toSql();
minTimes = 0;
result = "XXX";
+ catalog.getResourceMgr();
+ result = resourceMgr;
+ resourceMgr.getResource(resourceName);
+ result = resource;
+ catalog.getAuth();
+ result = auth;
+ auth.checkResourcePriv((ConnectContext) any, resourceName, PrivPredicate.USAGE);
+ result = true;
}
};
@@ -88,6 +104,16 @@ public class LoadStmtTest {
Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n"
+ "(XXX)", stmt.toString());
+
+ // test ResourceDesc
+ // TODO(wyb): spark-load
+ LoadStmt.disableSparkLoad = false;
+ stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList,
+ new ResourceDesc(resourceName, null), null);
+ stmt.analyze(analyzer);
+ Assert.assertEquals(EtlJobType.SPARK, stmt.getResourceDesc().getEtlJobType());
+ Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n(XXX)\nWITH RESOURCE 'spark0'",
+ stmt.toString());
}
@Test(expected = AnalysisException.class)
diff --git a/fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java b/fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java
new file mode 100644
index 0000000000..717524ba29
--- /dev/null
+++ b/fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.catalog.Catalog;
+import org.apache.doris.catalog.ResourceMgr;
+import org.apache.doris.catalog.SparkResource;
+import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.load.EtlJobType;
+
+import com.google.common.collect.Maps;
+import org.junit.Assert;
+import org.junit.Test;
+import mockit.Expectations;
+import mockit.Injectable;
+import mockit.Mocked;
+
+import java.util.Map;
+
+public class ResourceDescTest {
+
+ @Test
+ public void testNormal(@Mocked Catalog catalog, @Injectable ResourceMgr resourceMgr)
+ throws AnalysisException, DdlException {
+ String resourceName = "spark0";
+ Map properties = Maps.newHashMap();
+ String key = "spark.executor.memory";
+ String value = "2g";
+ properties.put(key, value);
+ ResourceDesc resourceDesc = new ResourceDesc(resourceName, properties);
+ SparkResource resource = new SparkResource(resourceName);
+
+ new Expectations() {
+ {
+ catalog.getResourceMgr();
+ result = resourceMgr;
+ resourceMgr.getResource(resourceName);
+ result = resource;
+ }
+ };
+
+ resourceDesc.analyze();
+ Assert.assertEquals(resourceName, resourceDesc.getName());
+ Assert.assertEquals(value, resourceDesc.getProperties().get(key));
+ Assert.assertEquals(EtlJobType.SPARK, resourceDesc.getEtlJobType());
+ }
+
+ @Test(expected = AnalysisException.class)
+ public void testNoResource(@Mocked Catalog catalog, @Injectable ResourceMgr resourceMgr) throws AnalysisException {
+ String resourceName = "spark1";
+ ResourceDesc resourceDesc = new ResourceDesc(resourceName, null);
+
+ new Expectations() {
+ {
+ catalog.getResourceMgr();
+ result = resourceMgr;
+ resourceMgr.getResource(resourceName);
+ result = null;
+ }
+ };
+
+ resourceDesc.analyze();
+ }
+}
diff --git a/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java b/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
index 30a1172533..261b973c69 100644
--- a/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
+++ b/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java
@@ -20,7 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateResourceStmt;
-//import org.apache.doris.analysis.ResourceDesc;
+import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.BaseProcResult;
@@ -119,7 +119,6 @@ public class SparkResourceTest {
Assert.assertEquals(9, result.getRows().size());
}
- /*
@Test
public void testUpdate(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)
throws UserException {
@@ -156,7 +155,6 @@ public class SparkResourceTest {
Assert.assertEquals(6, map.size());
Assert.assertEquals("2g", copiedResource.getSparkConfigs().get("spark.driver.memory"));
}
- */
@Test(expected = DdlException.class)
public void testNoBroker(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)
diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
index 7991343688..4d6b700509 100644
--- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
+++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java
@@ -101,7 +101,7 @@ public class BrokerLoadJobTest {
};
try {
- BrokerLoadJob.fromLoadStmt(loadStmt);
+ BulkLoadJob.fromLoadStmt(loadStmt);
Assert.fail();
} catch (DdlException e) {
System.out.println("could not find table named " + tableName);
@@ -123,6 +123,7 @@ public class BrokerLoadJobTest {
String databaseName = "database";
List dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
+ BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
new Expectations() {
{
@@ -153,6 +154,12 @@ public class BrokerLoadJobTest {
database.getId();
minTimes = 0;
result = dbId;
+ loadStmt.getBrokerDesc();
+ minTimes = 0;
+ result = brokerDesc;
+ loadStmt.getEtlJobType();
+ minTimes = 0;
+ result = EtlJobType.BROKER;
}
};
@@ -165,7 +172,7 @@ public class BrokerLoadJobTest {
};
try {
- BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt);
+ BrokerLoadJob brokerLoadJob = (BrokerLoadJob) BulkLoadJob.fromLoadStmt(loadStmt);
Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId"));
Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label"));
Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));