From edfa6683fc8d3cee5d2ee29ba28bad3093efb99f Mon Sep 17 00:00:00 2001 From: wyb Date: Thu, 28 May 2020 18:04:21 +0800 Subject: [PATCH] Add create spark load job --- fe/pom.xml | 20 ++ fe/src/main/cup/sql_parser.cup | 28 +- .../org/apache/doris/analysis/BrokerDesc.java | 11 +- .../org/apache/doris/analysis/LoadStmt.java | 83 ++++- .../apache/doris/analysis/ResourceDesc.java | 90 +++++ .../apache/doris/catalog/SparkResource.java | 4 +- .../java/org/apache/doris/common/Config.java | 6 + .../org/apache/doris/load/EtlJobType.java | 4 +- .../doris/load/loadv2/BrokerLoadJob.java | 266 +------------- .../apache/doris/load/loadv2/BulkLoadJob.java | 328 ++++++++++++++++++ .../apache/doris/load/loadv2/JobState.java | 1 + .../apache/doris/load/loadv2/LoadManager.java | 8 +- .../doris/load/loadv2/SparkLoadJob.java | 248 +++++++++++++ .../java/org/apache/doris/qe/DdlExecutor.java | 16 +- .../apache/doris/analysis/LoadStmtTest.java | 28 +- .../doris/analysis/ResourceDescTest.java | 80 +++++ .../doris/catalog/SparkResourceTest.java | 4 +- .../doris/load/loadv2/BrokerLoadJobTest.java | 11 +- 18 files changed, 930 insertions(+), 306 deletions(-) create mode 100644 fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java create mode 100644 fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java create mode 100644 fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java create mode 100644 fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java diff --git a/fe/pom.xml b/fe/pom.xml index 7915e2d184..34cf88a66a 100644 --- a/fe/pom.xml +++ b/fe/pom.xml @@ -547,6 +547,26 @@ under the License. 0.8.13 + + + + org.apache.spark + spark-core_2.12 + 2.4.5 + + + + org.apache.spark + spark-launcher_2.12 + 2.4.5 + + + + org.apache.spark + spark-sql_2.12 + 2.4.5 + provided + diff --git a/fe/src/main/cup/sql_parser.cup b/fe/src/main/cup/sql_parser.cup index e224d156b0..dce9763ed2 100644 --- a/fe/src/main/cup/sql_parser.cup +++ b/fe/src/main/cup/sql_parser.cup @@ -423,8 +423,8 @@ nonterminal List data_desc_list; nonterminal LabelName job_label; nonterminal String opt_with_label; nonterminal String opt_system; -nonterminal String opt_cluster; nonterminal BrokerDesc opt_broker; +nonterminal ResourceDesc resource_desc; nonterminal List opt_col_list, col_list, opt_dup_keys, opt_columns_from_path; nonterminal List opt_col_with_comment_list, col_with_comment_list; nonterminal ColWithComment col_with_comment; @@ -1197,6 +1197,13 @@ load_stmt ::= {: RESULT = new LoadStmt(label, dataDescList, broker, system, properties); :} + | KW_LOAD KW_LABEL job_label:label + LPAREN data_desc_list:dataDescList RPAREN + resource_desc:resource + opt_properties:properties + {: + RESULT = new LoadStmt(label, dataDescList, resource, properties); + :} ; job_label ::= @@ -1364,15 +1371,16 @@ opt_broker ::= :} ; -opt_cluster ::= - {: - RESULT = null; - :} - | KW_BY ident_or_text:cluster - {: - RESULT = cluster; - :} - ; +resource_desc ::= + KW_WITH KW_RESOURCE ident_or_text:resourceName + {: + RESULT = new ResourceDesc(resourceName, null); + :} + | KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN + {: + RESULT = new ResourceDesc(resourceName, properties); + :} + ; // Routine load statement create_routine_load_stmt ::= diff --git a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java index 8b3bac1cbc..8c561b47ee 100644 --- a/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java +++ b/fe/src/main/java/org/apache/doris/analysis/BrokerDesc.java @@ -29,6 +29,13 @@ import java.io.IOException; import java.util.Map; // Broker descriptor +// +// Broker example: +// WITH BROKER "broker0" +// ( +// "username" = "user0", +// "password" = "password0" +// ) public class BrokerDesc implements Writable { private String name; private Map properties; @@ -82,9 +89,9 @@ public class BrokerDesc implements Writable { public String toSql() { StringBuilder sb = new StringBuilder(); - sb.append(" WITH BROKER ").append(name); + sb.append("WITH BROKER ").append(name); if (properties != null && !properties.isEmpty()) { - PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false); + PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false, true); sb.append(" (").append(printableMap.toString()).append(")"); } return sb.toString(); diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java index a423d9ca72..96593b6c04 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -17,12 +17,16 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Catalog; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.util.PrintableMap; import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.load.EtlJobType; import org.apache.doris.load.Load; +import org.apache.doris.load.loadv2.SparkLoadJob; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import com.google.common.base.Function; @@ -39,7 +43,9 @@ import java.util.Map.Entry; // syntax: // LOAD LABEL load_label // (data_desc, ...) +// [broker_desc] // [BY cluster] +// [resource_desc] // [PROPERTIES (key1=value1, )] // // load_label: @@ -53,6 +59,14 @@ import java.util.Map.Entry; // [COLUMNS TERMINATED BY separator ] // [(col1, ...)] // [SET (k1=f1(xx), k2=f2(xx))] +// +// broker_desc: +// WITH BROKER name +// (key2=value2, ...) +// +// resource_desc: +// WITH RESOURCE name +// (key3=value3, ...) public class LoadStmt extends DdlStmt { public static final String TIMEOUT_PROPERTY = "timeout"; public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio"; @@ -80,11 +94,16 @@ public class LoadStmt extends DdlStmt { private final List dataDescriptions; private final BrokerDesc brokerDesc; private final String cluster; + private final ResourceDesc resourceDesc; private final Map properties; private String user; + private EtlJobType etlJobType = EtlJobType.UNKNOWN; private String version = "v2"; + // TODO(wyb): spark-load + public static boolean disableSparkLoad = true; + // properties set private final static ImmutableSet PROPERTIES_SET = new ImmutableSet.Builder() .add(TIMEOUT_PROPERTY) @@ -96,13 +115,25 @@ public class LoadStmt extends DdlStmt { .add(VERSION) .add(TIMEZONE) .build(); - + public LoadStmt(LabelName label, List dataDescriptions, BrokerDesc brokerDesc, String cluster, Map properties) { this.label = label; this.dataDescriptions = dataDescriptions; this.brokerDesc = brokerDesc; this.cluster = cluster; + this.resourceDesc = null; + this.properties = properties; + this.user = null; + } + + public LoadStmt(LabelName label, List dataDescriptions, + ResourceDesc resourceDesc, Map properties) { + this.label = label; + this.dataDescriptions = dataDescriptions; + this.brokerDesc = null; + this.cluster = null; + this.resourceDesc = resourceDesc; this.properties = properties; this.user = null; } @@ -123,6 +154,10 @@ public class LoadStmt extends DdlStmt { return cluster; } + public ResourceDesc getResourceDesc() { + return resourceDesc; + } + public Map getProperties() { return properties; } @@ -131,12 +166,21 @@ public class LoadStmt extends DdlStmt { return user; } + public EtlJobType getEtlJobType() { + return etlJobType; + } + public static void checkProperties(Map properties) throws DdlException { if (properties == null) { return; } for (Entry entry : properties.entrySet()) { + // temporary use for global dict + if (entry.getKey().startsWith(SparkLoadJob.BITMAP_DATA_PROPERTY)) { + continue; + } + if (!PROPERTIES_SET.contains(entry.getKey())) { throw new DdlException(entry.getKey() + " is invalid property"); } @@ -224,11 +268,34 @@ public class LoadStmt extends DdlStmt { throw new AnalysisException("No data file in load statement."); } for (DataDescription dataDescription : dataDescriptions) { - if (brokerDesc == null) { + if (brokerDesc == null && resourceDesc == null) { dataDescription.setIsHadoopLoad(true); } dataDescription.analyze(label.getDbName()); } + + if (resourceDesc != null) { + resourceDesc.analyze(); + etlJobType = resourceDesc.getEtlJobType(); + // TODO(wyb): spark-load + if (disableSparkLoad) { + throw new AnalysisException("Spark Load is comming soon"); + } + // check resource usage privilege + if (!Catalog.getCurrentCatalog().getAuth().checkResourcePriv(ConnectContext.get(), + resourceDesc.getName(), + PrivPredicate.USAGE)) { + throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser() + + "'@'" + ConnectContext.get().getRemoteIP() + + "' for resource '" + resourceDesc.getName() + "'"); + } + } else if (brokerDesc != null) { + etlJobType = EtlJobType.BROKER; + } else { + // if cluster is null, use default hadoop cluster + // if cluster is not null, use this hadoop cluster + etlJobType = EtlJobType.HADOOP; + } try { checkProperties(properties); @@ -242,7 +309,7 @@ public class LoadStmt extends DdlStmt { @Override public boolean needAuditEncryption() { - if (brokerDesc != null) { + if (brokerDesc != null || resourceDesc != null) { return true; } return false; @@ -263,16 +330,16 @@ public class LoadStmt extends DdlStmt { return dataDescription.toSql(); } })).append(")"); + if (brokerDesc != null) { + sb.append("\n").append(brokerDesc.toSql()); + } if (cluster != null) { sb.append("\nBY '"); sb.append(cluster); sb.append("'"); } - - if (brokerDesc != null) { - sb.append("\n WITH BROKER '").append(brokerDesc.getName()).append("' ("); - sb.append(new PrintableMap(brokerDesc.getProperties(), "=", true, false, true)); - sb.append(")"); + if (resourceDesc != null) { + sb.append("\n").append(resourceDesc.toSql()); } if (properties != null && !properties.isEmpty()) { diff --git a/fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java b/fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java new file mode 100644 index 0000000000..5d12b02349 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/analysis/ResourceDesc.java @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Resource; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.util.PrintableMap; +import org.apache.doris.load.EtlJobType; + +import com.google.common.collect.Maps; + +import java.util.Map; + +// Resource descriptor +// +// Spark example: +// WITH RESOURCE "spark0" +// ( +// "spark.jars" = "xxx.jar,yyy.jar", +// "spark.files" = "/tmp/aaa,/tmp/bbb", +// "spark.executor.memory" = "1g", +// "spark.yarn.queue" = "queue0" +// ) +public class ResourceDesc { + protected String name; + protected Map properties; + protected EtlJobType etlJobType; + + // Only used for recovery + private ResourceDesc() { + } + + public ResourceDesc(String name, Map properties) { + this.name = name; + this.properties = properties; + if (this.properties == null) { + this.properties = Maps.newHashMap(); + } + this.etlJobType = EtlJobType.UNKNOWN; + } + + public String getName() { + return name; + } + + public Map getProperties() { + return properties; + } + + public EtlJobType getEtlJobType() { + return etlJobType; + } + + public void analyze() throws AnalysisException { + // check resource exist or not + Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(getName()); + if (resource == null) { + throw new AnalysisException("Resource does not exist. name: " + getName()); + } + if (resource.getType() == Resource.ResourceType.SPARK) { + etlJobType = EtlJobType.SPARK; + } + } + + public String toSql() { + StringBuilder sb = new StringBuilder(); + sb.append("WITH RESOURCE '").append(name).append("'"); + if (properties != null && !properties.isEmpty()) { + PrintableMap printableMap = new PrintableMap<>(properties, " = ", true, false, true); + sb.append(" (").append(printableMap.toString()).append(")"); + } + return sb.toString(); + } +} diff --git a/fe/src/main/java/org/apache/doris/catalog/SparkResource.java b/fe/src/main/java/org/apache/doris/catalog/SparkResource.java index 3d57914b2e..e89c1c996d 100644 --- a/fe/src/main/java/org/apache/doris/catalog/SparkResource.java +++ b/fe/src/main/java/org/apache/doris/catalog/SparkResource.java @@ -17,7 +17,7 @@ package org.apache.doris.catalog; -//import org.apache.doris.analysis.ResourceDesc; +import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.common.DdlException; import org.apache.doris.common.Pair; import org.apache.doris.common.proc.BaseProcResult; @@ -148,7 +148,6 @@ public class SparkResource extends Resource { return getMaster().equalsIgnoreCase(YARN_MASTER); } - /* public void update(ResourceDesc resourceDesc) throws DdlException { Preconditions.checkState(name.equals(resourceDesc.getName())); @@ -172,7 +171,6 @@ public class SparkResource extends Resource { } brokerProperties.putAll(getBrokerProperties(properties)); } - */ @Override protected void setProperties(Map properties) throws DdlException { diff --git a/fe/src/main/java/org/apache/doris/common/Config.java b/fe/src/main/java/org/apache/doris/common/Config.java index c29921f205..8424e96714 100644 --- a/fe/src/main/java/org/apache/doris/common/Config.java +++ b/fe/src/main/java/org/apache/doris/common/Config.java @@ -491,6 +491,12 @@ public class Config extends ConfigBase { @ConfField(mutable = true, masterOnly = true) public static int hadoop_load_default_timeout_second = 86400 * 3; // 3 day + /* + * Default spark load timeout + */ + @ConfField(mutable = true, masterOnly = true) + public static int spark_load_default_timeout_second = 86400; // 1 day + /* * Default number of waiting jobs for routine load and version 2 of load * This is a desired number. diff --git a/fe/src/main/java/org/apache/doris/load/EtlJobType.java b/fe/src/main/java/org/apache/doris/load/EtlJobType.java index afc9c75bee..97f5aebf34 100644 --- a/fe/src/main/java/org/apache/doris/load/EtlJobType.java +++ b/fe/src/main/java/org/apache/doris/load/EtlJobType.java @@ -22,5 +22,7 @@ public enum EtlJobType { MINI, INSERT, BROKER, - DELETE + DELETE, + SPARK, + UNKNOWN } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 8d817f0f84..a42f4e7a66 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -17,63 +17,39 @@ package org.apache.doris.load.loadv2; - import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.DataDescription; -import org.apache.doris.analysis.LoadStmt; -import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.SqlScanner; -import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Table; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; -import org.apache.doris.common.DdlException; import org.apache.doris.common.DuplicatedRequestException; -import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; -import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; -import org.apache.doris.common.util.SqlParserUtils; import org.apache.doris.load.BrokerFileGroup; -import org.apache.doris.load.BrokerFileGroupAggInfo; import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey; import org.apache.doris.load.EtlJobType; import org.apache.doris.load.FailMsg; import org.apache.doris.metric.MetricRepo; -import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.OriginStatement; -import org.apache.doris.qe.SessionVariable; -import org.apache.doris.qe.SqlModeHelper; import org.apache.doris.service.FrontendOptions; import org.apache.doris.thrift.TUniqueId; import org.apache.doris.transaction.BeginTransactionException; -import org.apache.doris.transaction.TabletCommitInfo; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionState.TxnSourceType; import org.apache.doris.transaction.TransactionState.TxnCoordinator; import com.google.common.base.Joiner; -import com.google.common.base.Strings; import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.io.StringReader; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.UUID; /** @@ -82,129 +58,22 @@ import java.util.UUID; * Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished. * Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished. */ -public class BrokerLoadJob extends LoadJob { +public class BrokerLoadJob extends BulkLoadJob { private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class); - // input params - private BrokerDesc brokerDesc; - // this param is used to persist the expr of columns - // the origin stmt is persisted instead of columns expr - // the expr of columns will be reanalyze when the log is replayed - private OriginStatement originStmt; - - // include broker desc and data desc - private BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo(); - private List commitInfos = Lists.newArrayList(); - - // sessionVariable's name -> sessionVariable's value - // we persist these sessionVariables due to the session is not available when replaying the job. - private Map sessionVariables = Maps.newHashMap(); - // only for log replay public BrokerLoadJob() { super(); this.jobType = EtlJobType.BROKER; } - private BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt) + public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt) throws MetaNotFoundException { - super(dbId, label); + super(dbId, label, originStmt); this.timeoutSecond = Config.broker_load_default_timeout_second; this.brokerDesc = brokerDesc; - this.originStmt = originStmt; this.jobType = EtlJobType.BROKER; - this.authorizationInfo = gatherAuthInfo(); - - if (ConnectContext.get() != null) { - SessionVariable var = ConnectContext.get().getSessionVariable(); - sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); - } else { - sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); - } - } - - public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { - // get db id - String dbName = stmt.getLabel().getDbName(); - Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName()); - if (db == null) { - throw new DdlException("Database[" + dbName + "] does not exist"); - } - - // create job - try { - BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), - stmt.getBrokerDesc(), stmt.getOrigStmt()); - brokerLoadJob.setJobProperties(stmt.getProperties()); - brokerLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); - return brokerLoadJob; - } catch (MetaNotFoundException e) { - throw new DdlException(e.getMessage()); - } - } - - private void checkAndSetDataSourceInfo(Database db, List dataDescriptions) throws DdlException { - // check data source info - db.readLock(); - try { - for (DataDescription dataDescription : dataDescriptions) { - BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription); - fileGroup.parse(db, dataDescription); - fileGroupAggInfo.addFileGroup(fileGroup); - } - } finally { - db.readUnlock(); - } - } - - private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { - Database database = Catalog.getCurrentCatalog().getDb(dbId); - if (database == null) { - throw new MetaNotFoundException("Database " + dbId + "has been deleted"); - } - return new AuthorizationInfo(database.getFullName(), getTableNames()); - } - - @Override - public Set getTableNamesForShow() { - Set result = Sets.newHashSet(); - Database database = Catalog.getCurrentCatalog().getDb(dbId); - if (database == null) { - for (long tableId : fileGroupAggInfo.getAllTableIds()) { - result.add(String.valueOf(tableId)); - } - return result; - } - for (long tableId : fileGroupAggInfo.getAllTableIds()) { - Table table = database.getTable(tableId); - if (table == null) { - result.add(String.valueOf(tableId)); - } else { - result.add(table.getName()); - } - } - return result; - } - - @Override - public Set getTableNames() throws MetaNotFoundException{ - Set result = Sets.newHashSet(); - Database database = Catalog.getCurrentCatalog().getDb(dbId); - if (database == null) { - throw new MetaNotFoundException("Database " + dbId + "has been deleted"); - } - // The database will not be locked in here. - // The getTable is a thread-safe method called without read lock of database - for (long tableId : fileGroupAggInfo.getAllTableIds()) { - Table table = database.getTable(tableId); - if (table == null) { - throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId); - } else { - result.add(table.getName()); - } - } - return result; } @Override @@ -242,76 +111,6 @@ public class BrokerLoadJob extends LoadJob { } } - @Override - public void onTaskFailed(long taskId, FailMsg failMsg) { - writeLock(); - try { - // check if job has been completed - if (isTxnDone()) { - LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) - .add("state", state) - .add("error_msg", "this task will be ignored when job is: " + state) - .build()); - return; - } - LoadTask loadTask = idToTasks.get(taskId); - if (loadTask == null) { - return; - } - if (loadTask.getRetryTime() <= 0) { - unprotectedExecuteCancel(failMsg, true); - logFinalOperation(); - return; - } else { - // retry task - idToTasks.remove(loadTask.getSignature()); - if (loadTask instanceof LoadLoadingTask) { - loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId()); - } - loadTask.updateRetryInfo(); - idToTasks.put(loadTask.getSignature(), loadTask); - // load id will be added to loadStatistic when executing this task - Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); - return; - } - } finally { - writeUnlock(); - } - } - - /** - * If the db or table could not be found, the Broker load job will be cancelled. - */ - @Override - public void analyze() { - if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) { - return; - } - // Reset dataSourceInfo, it will be re-created in analyze - fileGroupAggInfo = new BrokerFileGroupAggInfo(); - SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt), - Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE)))); - LoadStmt stmt = null; - try { - stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx); - for (DataDescription dataDescription : stmt.getDataDescriptions()) { - dataDescription.analyzeWithoutCheckPriv(); - } - Database db = Catalog.getCurrentCatalog().getDb(dbId); - if (db == null) { - throw new DdlException("Database[" + dbId + "] does not exist"); - } - checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); - } catch (Exception e) { - LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) - .add("origin_stmt", originStmt) - .add("msg", "The failure happens in analyze, the load job will be cancelled with error:" - + e.getMessage()) - .build(), e); - cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true); - } - } - /** * step1: divide job into loading task * step2: init the plan of task @@ -514,63 +313,4 @@ public class BrokerLoadJob extends LoadJob { } return String.valueOf(value); } - - @Override - protected void replayTxnAttachment(TransactionState txnState) { - if (txnState.getTxnCommitAttachment() == null) { - // The txn attachment maybe null when broker load has been cancelled without attachment. - // The end log of broker load has been record but the callback id of txnState hasn't been removed - // So the callback of txn is executed when log of txn aborted is replayed. - return; - } - unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); - } - - @Override - public void write(DataOutput out) throws IOException { - super.write(out); - brokerDesc.write(out); - originStmt.write(out); - - out.writeInt(sessionVariables.size()); - for (Map.Entry entry : sessionVariables.entrySet()) { - Text.writeString(out, entry.getKey()); - Text.writeString(out, entry.getValue()); - } - } - - public void readFields(DataInput in) throws IOException { - super.readFields(in); - brokerDesc = BrokerDesc.read(in); - if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_61) { - fileGroupAggInfo.readFields(in); - } - - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) { - if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) { - String stmt = Text.readString(in); - originStmt = new OriginStatement(stmt, 0); - } else { - originStmt = OriginStatement.read(in); - } - } else { - originStmt = new OriginStatement("", 0); - } - // The origin stmt does not be analyzed in here. - // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName. - // The origin stmt will be analyzed after the replay is completed. - - if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) { - int size = in.readInt(); - for (int i = 0; i < size; i++) { - String key = Text.readString(in); - String value = Text.readString(in); - sessionVariables.put(key, value); - } - } else { - // old version of load does not have sqlmode, set it to default - sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); - } - } - } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java new file mode 100644 index 0000000000..4d658fec83 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -0,0 +1,328 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.catalog.AuthorizationInfo; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Table; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.io.Text; +import org.apache.doris.common.util.LogBuilder; +import org.apache.doris.common.util.LogKey; +import org.apache.doris.common.util.SqlParserUtils; +import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.load.BrokerFileGroupAggInfo; +import org.apache.doris.load.FailMsg; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.qe.SessionVariable; +import org.apache.doris.qe.SqlModeHelper; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionState; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.StringReader; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * parent class of BrokerLoadJob and SparkLoadJob from load stmt + */ +public abstract class BulkLoadJob extends LoadJob { + private static final Logger LOG = LogManager.getLogger(BulkLoadJob.class); + + // input params + protected BrokerDesc brokerDesc; + // this param is used to persist the expr of columns + // the origin stmt is persisted instead of columns expr + // the expr of columns will be reanalyze when the log is replayed + private OriginStatement originStmt; + + // include broker desc and data desc + protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo(); + protected List commitInfos = Lists.newArrayList(); + + // sessionVariable's name -> sessionVariable's value + // we persist these sessionVariables due to the session is not available when replaying the job. + private Map sessionVariables = Maps.newHashMap(); + + // only for log replay + public BulkLoadJob() { + super(); + } + + public BulkLoadJob(long dbId, String label, OriginStatement originStmt) throws MetaNotFoundException { + super(dbId, label); + this.originStmt = originStmt; + this.authorizationInfo = gatherAuthInfo(); + + if (ConnectContext.get() != null) { + SessionVariable var = ConnectContext.get().getSessionVariable(); + sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode())); + } else { + sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); + } + } + + public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { + // get db id + String dbName = stmt.getLabel().getDbName(); + Database db = Catalog.getCurrentCatalog().getDb(dbName); + if (db == null) { + throw new DdlException("Database[" + dbName + "] does not exist"); + } + + // create job + BulkLoadJob bulkLoadJob = null; + try { + switch (stmt.getEtlJobType()) { + case BROKER: + bulkLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), + stmt.getBrokerDesc(), stmt.getOrigStmt()); + break; + case SPARK: + bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), + stmt.getResourceDesc(), stmt.getOrigStmt()); + break; + case MINI: + case DELETE: + case HADOOP: + case INSERT: + throw new DdlException("LoadManager only support create broker and spark load job from stmt."); + default: + throw new DdlException("Unknown load job type."); + } + bulkLoadJob.setJobProperties(stmt.getProperties()); + bulkLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); + return bulkLoadJob; + } catch (MetaNotFoundException e) { + throw new DdlException(e.getMessage()); + } + } + + private void checkAndSetDataSourceInfo(Database db, List dataDescriptions) throws DdlException { + // check data source info + db.readLock(); + try { + for (DataDescription dataDescription : dataDescriptions) { + BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription); + fileGroup.parse(db, dataDescription); + fileGroupAggInfo.addFileGroup(fileGroup); + } + } finally { + db.readUnlock(); + } + } + + private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException { + Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + return new AuthorizationInfo(database.getFullName(), getTableNames()); + } + + @Override + public Set getTableNamesForShow() { + Set result = Sets.newHashSet(); + Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + for (long tableId : fileGroupAggInfo.getAllTableIds()) { + result.add(String.valueOf(tableId)); + } + return result; + } + for (long tableId : fileGroupAggInfo.getAllTableIds()) { + Table table = database.getTable(tableId); + if (table == null) { + result.add(String.valueOf(tableId)); + } else { + result.add(table.getName()); + } + } + return result; + } + + @Override + public Set getTableNames() throws MetaNotFoundException{ + Set result = Sets.newHashSet(); + Database database = Catalog.getCurrentCatalog().getDb(dbId); + if (database == null) { + throw new MetaNotFoundException("Database " + dbId + "has been deleted"); + } + // The database will not be locked in here. + // The getTable is a thread-safe method called without read lock of database + for (long tableId : fileGroupAggInfo.getAllTableIds()) { + Table table = database.getTable(tableId); + if (table == null) { + throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId); + } else { + result.add(table.getName()); + } + } + return result; + } + + @Override + public void onTaskFailed(long taskId, FailMsg failMsg) { + writeLock(); + try { + // check if job has been completed + if (isTxnDone()) { + LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id) + .add("state", state) + .add("error_msg", "this task will be ignored when job is: " + state) + .build()); + return; + } + LoadTask loadTask = idToTasks.get(taskId); + if (loadTask == null) { + return; + } + if (loadTask.getRetryTime() <= 0) { + unprotectedExecuteCancel(failMsg, true); + logFinalOperation(); + return; + } else { + // retry task + idToTasks.remove(loadTask.getSignature()); + if (loadTask instanceof LoadLoadingTask) { + loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId()); + } + loadTask.updateRetryInfo(); + idToTasks.put(loadTask.getSignature(), loadTask); + // load id will be added to loadStatistic when executing this task + Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask); + return; + } + } finally { + writeUnlock(); + } + } + + /** + * If the db or table could not be found, the Broker load job will be cancelled. + */ + @Override + public void analyze() { + if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) { + return; + } + // Reset dataSourceInfo, it will be re-created in analyze + fileGroupAggInfo = new BrokerFileGroupAggInfo(); + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt), + Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE)))); + LoadStmt stmt = null; + try { + stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx); + for (DataDescription dataDescription : stmt.getDataDescriptions()) { + dataDescription.analyzeWithoutCheckPriv(); + } + Database db = Catalog.getCurrentCatalog().getDb(dbId); + if (db == null) { + throw new DdlException("Database[" + dbId + "] does not exist"); + } + checkAndSetDataSourceInfo(db, stmt.getDataDescriptions()); + } catch (Exception e) { + LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) + .add("origin_stmt", originStmt) + .add("msg", "The failure happens in analyze, the load job will be cancelled with error:" + + e.getMessage()) + .build(), e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true); + } + } + + @Override + protected void replayTxnAttachment(TransactionState txnState) { + if (txnState.getTxnCommitAttachment() == null) { + // The txn attachment maybe null when broker load has been cancelled without attachment. + // The end log of broker load has been record but the callback id of txnState hasn't been removed + // So the callback of txn is executed when log of txn aborted is replayed. + return; + } + unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment()); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + brokerDesc.write(out); + originStmt.write(out); + + out.writeInt(sessionVariables.size()); + for (Map.Entry entry : sessionVariables.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue()); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + brokerDesc = BrokerDesc.read(in); + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_61) { + fileGroupAggInfo.readFields(in); + } + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) { + if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) { + String stmt = Text.readString(in); + originStmt = new OriginStatement(stmt, 0); + } else { + originStmt = OriginStatement.read(in); + } + } else { + originStmt = new OriginStatement("", 0); + } + // The origin stmt does not be analyzed in here. + // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName. + // The origin stmt will be analyzed after the replay is completed. + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) { + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String key = Text.readString(in); + String value = Text.readString(in); + sessionVariables.put(key, value); + } + } else { + // old version of load does not have sqlmode, set it to default + sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT)); + } + } + +} diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java index 7f1d78821a..f25e05bb0e 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/JobState.java @@ -21,6 +21,7 @@ package org.apache.doris.load.loadv2; public enum JobState { UNKNOWN, // this is only for ISSUE #2354 PENDING, // init state + ETL, // load data partition, sort and aggregation with etl cluster LOADING, // job is running COMMITTED, // transaction is committed but not visible FINISHED, // transaction is visible and job is finished diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index e0cb91a716..bdeb0c2501 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -17,6 +17,7 @@ package org.apache.doris.load.loadv2; +import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL; import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL; import org.apache.doris.analysis.CancelLoadStmt; @@ -104,14 +105,14 @@ public class LoadManager implements Writable{ writeLock(); try { checkLabelUsed(dbId, stmt.getLabel().getLabelName()); - if (stmt.getBrokerDesc() == null) { - throw new DdlException("LoadManager only support the broker load."); + if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) { + throw new DdlException("LoadManager only support the broker and spark load."); } if (loadJobScheduler.isQueueFull()) { throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, " + "please retry later."); } - loadJob = BrokerLoadJob.fromLoadStmt(stmt); + loadJob = BulkLoadJob.fromLoadStmt(stmt); createLoadJob(loadJob); } finally { writeUnlock(); @@ -529,7 +530,6 @@ public class LoadManager implements Writable{ * * @param dbId * @param label - * @param requestId: the uuid of each txn request from BE * @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job. */ private void checkLabelUsed(long dbId, String label) diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java new file mode 100644 index 0000000000..387452b976 --- /dev/null +++ b/fe/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.loadv2; + +import com.google.common.base.Strings; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.ResourceDesc; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Resource; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; +import org.apache.doris.common.io.Text; +import org.apache.doris.load.EtlJobType; +import org.apache.doris.load.FailMsg; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.task.PushTask; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.spark.launcher.SparkAppHandle; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + * There are 4 steps in SparkLoadJob: + * Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job. + * Step2: LoadEtlChecker will check spark etl job status periodly and send push tasks to be when spark etl job is finished. + * Step3: LoadLoadingChecker will check loading status periodly and commit transaction when push tasks are finished. + * Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction. + */ +public class SparkLoadJob extends BulkLoadJob { + private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class); + + // for global dict + public static final String BITMAP_DATA_PROPERTY = "bitmap_data"; + + // --- members below need persist --- + // create from resourceDesc when job created + private SparkResource sparkResource; + // members below updated when job state changed to etl + private long etlStartTimestamp = -1; + // for spark yarn + private String appId = ""; + // spark job outputPath + private String etlOutputPath = ""; + // members below updated when job state changed to loading + // { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) } + private Map> tabletMetaToFileInfo = Maps.newHashMap(); + + // --- members below not persist --- + // temporary use + // one SparkLoadJob has only one table to load + // hivedb.table for global dict + private String hiveTableName = ""; + private ResourceDesc resourceDesc; + // for spark standalone + private SparkAppHandle sparkAppHandle; + // for straggler wait long time to commit transaction + private long quorumFinishTimestamp = -1; + // below for push task + private Map> tableToLoadPartitions = Maps.newHashMap(); + //private Map indexToPushBrokerReaderParams = Maps.newHashMap(); + private Map indexToSchemaHash = Maps.newHashMap(); + private Map> tabletToSentReplicaPushTask = Maps.newHashMap(); + private Set finishedReplicas = Sets.newHashSet(); + private Set quorumTablets = Sets.newHashSet(); + private Set fullTablets = Sets.newHashSet(); + + // only for log replay + public SparkLoadJob() { + super(); + jobType = EtlJobType.SPARK; + } + + public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt) + throws MetaNotFoundException { + super(dbId, label, originStmt); + this.resourceDesc = resourceDesc; + timeoutSecond = Config.spark_load_default_timeout_second; + jobType = EtlJobType.SPARK; + } + + public String getHiveTableName() { + return hiveTableName; + } + + @Override + protected void setJobProperties(Map properties) throws DdlException { + super.setJobProperties(properties); + + // set spark resource and broker desc + setResourceInfo(); + + // global dict + if (properties != null) { + if (properties.containsKey(BITMAP_DATA_PROPERTY)) { + hiveTableName = properties.get(BITMAP_DATA_PROPERTY); + } + } + } + + /** + * merge system conf with load stmt + * @throws DdlException + */ + private void setResourceInfo() throws DdlException { + // spark resource + String resourceName = resourceDesc.getName(); + Resource oriResource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName); + if (oriResource == null) { + throw new DdlException("Resource does not exist. name: " + resourceName); + } + sparkResource = ((SparkResource) oriResource).getCopiedResource(); + sparkResource.update(resourceDesc); + + // broker desc + Map brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix(); + brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties); + } + + /** + * load job already cancelled or finished, clear job below: + * 1. kill etl job and delete etl files + * 2. clear push tasks and infos that not persist + */ + private void clearJob() { + Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED); + + LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state); + // TODO(wyb): spark-load + //SparkEtlJobHandler handler = new SparkEtlJobHandler(); + if (state == JobState.CANCELLED) { + if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) { + try { + // TODO(wyb): spark-load + //handler.killEtlJob(sparkAppHandle, appId, id, sparkResource); + } catch (Exception e) { + LOG.warn("kill etl job failed. id: {}, state: {}", id, state, e); + } + } + } + if (!Strings.isNullOrEmpty(etlOutputPath)) { + try { + // delete label dir, remove the last taskId dir + String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/")); + // TODO(wyb): spark-load + //handler.deleteEtlOutputPath(outputPath, brokerDesc); + } catch (Exception e) { + LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e); + } + } + + LOG.debug("clear push tasks and infos that not persist. id: {}, state: {}", id, state); + writeLock(); + try { + // clear push task first + for (Map sentReplicaPushTask : tabletToSentReplicaPushTask.values()) { + for (PushTask pushTask : sentReplicaPushTask.values()) { + if (pushTask == null) { + continue; + } + AgentTaskQueue.removeTask(pushTask.getBackendId(), pushTask.getTaskType(), pushTask.getSignature()); + } + } + // clear job infos that not persist + hiveTableName = ""; + sparkAppHandle = null; + resourceDesc = null; + tableToLoadPartitions.clear(); + //indexToPushBrokerReaderParams.clear(); + indexToSchemaHash.clear(); + tabletToSentReplicaPushTask.clear(); + finishedReplicas.clear(); + quorumTablets.clear(); + fullTablets.clear(); + } finally { + writeUnlock(); + } + } + + @Override + public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) { + super.cancelJobWithoutCheck(failMsg, abortTxn, needLog); + clearJob(); + } + + @Override + public void cancelJob(FailMsg failMsg) throws DdlException { + super.cancelJob(failMsg); + clearJob(); + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + sparkResource.write(out); + out.writeLong(etlStartTimestamp); + Text.writeString(out, appId); + Text.writeString(out, etlOutputPath); + out.writeInt(tabletMetaToFileInfo.size()); + for (Map.Entry> entry : tabletMetaToFileInfo.entrySet()) { + Text.writeString(out, entry.getKey()); + Text.writeString(out, entry.getValue().first); + out.writeLong(entry.getValue().second); + } + } + + public void readFields(DataInput in) throws IOException { + super.readFields(in); + sparkResource = (SparkResource) Resource.read(in); + etlStartTimestamp = in.readLong(); + appId = Text.readString(in); + etlOutputPath = Text.readString(in); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + String tabletMetaStr = Text.readString(in); + Pair fileInfo = Pair.create(Text.readString(in), in.readLong()); + tabletMetaToFileInfo.put(tabletMetaStr, fileInfo); + } + } +} diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index db1a0e5e59..824e9f6a7a 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -118,15 +118,13 @@ public class DdlExecutor { catalog.cancelAlter((CancelAlterTableStmt) ddlStmt); } else if (ddlStmt instanceof LoadStmt) { LoadStmt loadStmt = (LoadStmt) ddlStmt; - EtlJobType jobType; - if (loadStmt.getBrokerDesc() != null) { - jobType = EtlJobType.BROKER; - } else { - if (Config.disable_hadoop_load) { - throw new DdlException("Load job by hadoop cluster is disabled." - + " Try using broker load. See 'help broker load;'"); - } - jobType = EtlJobType.HADOOP; + EtlJobType jobType = loadStmt.getEtlJobType(); + if (jobType == EtlJobType.UNKNOWN) { + throw new DdlException("Unknown load job type"); + } + if (jobType == EtlJobType.HADOOP && Config.disable_hadoop_load) { + throw new DdlException("Load job by hadoop cluster is disabled." + + " Try using broker load. See 'help broker load;'"); } if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) { catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis()); diff --git a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index 48dd23b695..993fda1573 100644 --- a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -17,9 +17,14 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.ResourceMgr; +import org.apache.doris.catalog.SparkResource; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.load.EtlJobType; import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import com.google.common.collect.Lists; @@ -68,15 +73,26 @@ public class LoadStmtTest { } @Test - public void testNormal(@Injectable DataDescription desc) throws UserException, AnalysisException { + public void testNormal(@Injectable DataDescription desc, @Mocked Catalog catalog, + @Injectable ResourceMgr resourceMgr, @Injectable PaloAuth auth) throws UserException, AnalysisException { List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(desc); + String resourceName = "spark0"; + SparkResource resource = new SparkResource(resourceName); new Expectations(){ { desc.toSql(); minTimes = 0; result = "XXX"; + catalog.getResourceMgr(); + result = resourceMgr; + resourceMgr.getResource(resourceName); + result = resource; + catalog.getAuth(); + result = auth; + auth.checkResourcePriv((ConnectContext) any, resourceName, PrivPredicate.USAGE); + result = true; } }; @@ -88,6 +104,16 @@ public class LoadStmtTest { Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n" + "(XXX)", stmt.toString()); + + // test ResourceDesc + // TODO(wyb): spark-load + LoadStmt.disableSparkLoad = false; + stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, + new ResourceDesc(resourceName, null), null); + stmt.analyze(analyzer); + Assert.assertEquals(EtlJobType.SPARK, stmt.getResourceDesc().getEtlJobType()); + Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n(XXX)\nWITH RESOURCE 'spark0'", + stmt.toString()); } @Test(expected = AnalysisException.class) diff --git a/fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java b/fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java new file mode 100644 index 0000000000..717524ba29 --- /dev/null +++ b/fe/src/test/java/org/apache/doris/analysis/ResourceDescTest.java @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.ResourceMgr; +import org.apache.doris.catalog.SparkResource; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.load.EtlJobType; + +import com.google.common.collect.Maps; +import org.junit.Assert; +import org.junit.Test; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; + +import java.util.Map; + +public class ResourceDescTest { + + @Test + public void testNormal(@Mocked Catalog catalog, @Injectable ResourceMgr resourceMgr) + throws AnalysisException, DdlException { + String resourceName = "spark0"; + Map properties = Maps.newHashMap(); + String key = "spark.executor.memory"; + String value = "2g"; + properties.put(key, value); + ResourceDesc resourceDesc = new ResourceDesc(resourceName, properties); + SparkResource resource = new SparkResource(resourceName); + + new Expectations() { + { + catalog.getResourceMgr(); + result = resourceMgr; + resourceMgr.getResource(resourceName); + result = resource; + } + }; + + resourceDesc.analyze(); + Assert.assertEquals(resourceName, resourceDesc.getName()); + Assert.assertEquals(value, resourceDesc.getProperties().get(key)); + Assert.assertEquals(EtlJobType.SPARK, resourceDesc.getEtlJobType()); + } + + @Test(expected = AnalysisException.class) + public void testNoResource(@Mocked Catalog catalog, @Injectable ResourceMgr resourceMgr) throws AnalysisException { + String resourceName = "spark1"; + ResourceDesc resourceDesc = new ResourceDesc(resourceName, null); + + new Expectations() { + { + catalog.getResourceMgr(); + result = resourceMgr; + resourceMgr.getResource(resourceName); + result = null; + } + }; + + resourceDesc.analyze(); + } +} diff --git a/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java b/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java index 30a1172533..261b973c69 100644 --- a/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java +++ b/fe/src/test/java/org/apache/doris/catalog/SparkResourceTest.java @@ -20,7 +20,7 @@ package org.apache.doris.catalog; import org.apache.doris.analysis.AccessTestUtil; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateResourceStmt; -//import org.apache.doris.analysis.ResourceDesc; +import org.apache.doris.analysis.ResourceDesc; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; import org.apache.doris.common.proc.BaseProcResult; @@ -119,7 +119,6 @@ public class SparkResourceTest { Assert.assertEquals(9, result.getRows().size()); } - /* @Test public void testUpdate(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth) throws UserException { @@ -156,7 +155,6 @@ public class SparkResourceTest { Assert.assertEquals(6, map.size()); Assert.assertEquals("2g", copiedResource.getSparkConfigs().get("spark.driver.memory")); } - */ @Test(expected = DdlException.class) public void testNoBroker(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth) diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 7991343688..4d6b700509 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -101,7 +101,7 @@ public class BrokerLoadJobTest { }; try { - BrokerLoadJob.fromLoadStmt(loadStmt); + BulkLoadJob.fromLoadStmt(loadStmt); Assert.fail(); } catch (DdlException e) { System.out.println("could not find table named " + tableName); @@ -123,6 +123,7 @@ public class BrokerLoadJobTest { String databaseName = "database"; List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); + BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap()); new Expectations() { { @@ -153,6 +154,12 @@ public class BrokerLoadJobTest { database.getId(); minTimes = 0; result = dbId; + loadStmt.getBrokerDesc(); + minTimes = 0; + result = brokerDesc; + loadStmt.getEtlJobType(); + minTimes = 0; + result = EtlJobType.BROKER; } }; @@ -165,7 +172,7 @@ public class BrokerLoadJobTest { }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt); + BrokerLoadJob brokerLoadJob = (BrokerLoadJob) BulkLoadJob.fromLoadStmt(loadStmt); Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId")); Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label")); Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));