Add create spark load job

This commit is contained in:
wyb
2020-05-28 18:04:21 +08:00
parent 73c3de4313
commit edfa6683fc
18 changed files with 930 additions and 306 deletions

View File

@ -547,6 +547,26 @@ under the License.
<version>0.8.13</version>
</dependency>
<!-- spark -->
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-launcher_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_2.12</artifactId>
<version>2.4.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.12 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.5</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>

View File

@ -423,8 +423,8 @@ nonterminal List<DataDescription> data_desc_list;
nonterminal LabelName job_label;
nonterminal String opt_with_label;
nonterminal String opt_system;
nonterminal String opt_cluster;
nonterminal BrokerDesc opt_broker;
nonterminal ResourceDesc resource_desc;
nonterminal List<String> opt_col_list, col_list, opt_dup_keys, opt_columns_from_path;
nonterminal List<ColWithComment> opt_col_with_comment_list, col_with_comment_list;
nonterminal ColWithComment col_with_comment;
@ -1197,6 +1197,13 @@ load_stmt ::=
{:
RESULT = new LoadStmt(label, dataDescList, broker, system, properties);
:}
| KW_LOAD KW_LABEL job_label:label
LPAREN data_desc_list:dataDescList RPAREN
resource_desc:resource
opt_properties:properties
{:
RESULT = new LoadStmt(label, dataDescList, resource, properties);
:}
;
job_label ::=
@ -1364,15 +1371,16 @@ opt_broker ::=
:}
;
opt_cluster ::=
{:
RESULT = null;
:}
| KW_BY ident_or_text:cluster
{:
RESULT = cluster;
:}
;
resource_desc ::=
KW_WITH KW_RESOURCE ident_or_text:resourceName
{:
RESULT = new ResourceDesc(resourceName, null);
:}
| KW_WITH KW_RESOURCE ident_or_text:resourceName LPAREN key_value_map:properties RPAREN
{:
RESULT = new ResourceDesc(resourceName, properties);
:}
;
// Routine load statement
create_routine_load_stmt ::=

View File

@ -29,6 +29,13 @@ import java.io.IOException;
import java.util.Map;
// Broker descriptor
//
// Broker example:
// WITH BROKER "broker0"
// (
// "username" = "user0",
// "password" = "password0"
// )
public class BrokerDesc implements Writable {
private String name;
private Map<String, String> properties;
@ -82,9 +89,9 @@ public class BrokerDesc implements Writable {
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append(" WITH BROKER ").append(name);
sb.append("WITH BROKER ").append(name);
if (properties != null && !properties.isEmpty()) {
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false);
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false, true);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();

View File

@ -17,12 +17,16 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.SparkLoadJob;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Function;
@ -39,7 +43,9 @@ import java.util.Map.Entry;
// syntax:
// LOAD LABEL load_label
// (data_desc, ...)
// [broker_desc]
// [BY cluster]
// [resource_desc]
// [PROPERTIES (key1=value1, )]
//
// load_label:
@ -53,6 +59,14 @@ import java.util.Map.Entry;
// [COLUMNS TERMINATED BY separator ]
// [(col1, ...)]
// [SET (k1=f1(xx), k2=f2(xx))]
//
// broker_desc:
// WITH BROKER name
// (key2=value2, ...)
//
// resource_desc:
// WITH RESOURCE name
// (key3=value3, ...)
public class LoadStmt extends DdlStmt {
public static final String TIMEOUT_PROPERTY = "timeout";
public static final String MAX_FILTER_RATIO_PROPERTY = "max_filter_ratio";
@ -80,11 +94,16 @@ public class LoadStmt extends DdlStmt {
private final List<DataDescription> dataDescriptions;
private final BrokerDesc brokerDesc;
private final String cluster;
private final ResourceDesc resourceDesc;
private final Map<String, String> properties;
private String user;
private EtlJobType etlJobType = EtlJobType.UNKNOWN;
private String version = "v2";
// TODO(wyb): spark-load
public static boolean disableSparkLoad = true;
// properties set
private final static ImmutableSet<String> PROPERTIES_SET = new ImmutableSet.Builder<String>()
.add(TIMEOUT_PROPERTY)
@ -96,13 +115,25 @@ public class LoadStmt extends DdlStmt {
.add(VERSION)
.add(TIMEZONE)
.build();
public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
BrokerDesc brokerDesc, String cluster, Map<String, String> properties) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = brokerDesc;
this.cluster = cluster;
this.resourceDesc = null;
this.properties = properties;
this.user = null;
}
public LoadStmt(LabelName label, List<DataDescription> dataDescriptions,
ResourceDesc resourceDesc, Map<String, String> properties) {
this.label = label;
this.dataDescriptions = dataDescriptions;
this.brokerDesc = null;
this.cluster = null;
this.resourceDesc = resourceDesc;
this.properties = properties;
this.user = null;
}
@ -123,6 +154,10 @@ public class LoadStmt extends DdlStmt {
return cluster;
}
public ResourceDesc getResourceDesc() {
return resourceDesc;
}
public Map<String, String> getProperties() {
return properties;
}
@ -131,12 +166,21 @@ public class LoadStmt extends DdlStmt {
return user;
}
public EtlJobType getEtlJobType() {
return etlJobType;
}
public static void checkProperties(Map<String, String> properties) throws DdlException {
if (properties == null) {
return;
}
for (Entry<String, String> entry : properties.entrySet()) {
// temporary use for global dict
if (entry.getKey().startsWith(SparkLoadJob.BITMAP_DATA_PROPERTY)) {
continue;
}
if (!PROPERTIES_SET.contains(entry.getKey())) {
throw new DdlException(entry.getKey() + " is invalid property");
}
@ -224,11 +268,34 @@ public class LoadStmt extends DdlStmt {
throw new AnalysisException("No data file in load statement.");
}
for (DataDescription dataDescription : dataDescriptions) {
if (brokerDesc == null) {
if (brokerDesc == null && resourceDesc == null) {
dataDescription.setIsHadoopLoad(true);
}
dataDescription.analyze(label.getDbName());
}
if (resourceDesc != null) {
resourceDesc.analyze();
etlJobType = resourceDesc.getEtlJobType();
// TODO(wyb): spark-load
if (disableSparkLoad) {
throw new AnalysisException("Spark Load is comming soon");
}
// check resource usage privilege
if (!Catalog.getCurrentCatalog().getAuth().checkResourcePriv(ConnectContext.get(),
resourceDesc.getName(),
PrivPredicate.USAGE)) {
throw new AnalysisException("USAGE denied to user '" + ConnectContext.get().getQualifiedUser()
+ "'@'" + ConnectContext.get().getRemoteIP()
+ "' for resource '" + resourceDesc.getName() + "'");
}
} else if (brokerDesc != null) {
etlJobType = EtlJobType.BROKER;
} else {
// if cluster is null, use default hadoop cluster
// if cluster is not null, use this hadoop cluster
etlJobType = EtlJobType.HADOOP;
}
try {
checkProperties(properties);
@ -242,7 +309,7 @@ public class LoadStmt extends DdlStmt {
@Override
public boolean needAuditEncryption() {
if (brokerDesc != null) {
if (brokerDesc != null || resourceDesc != null) {
return true;
}
return false;
@ -263,16 +330,16 @@ public class LoadStmt extends DdlStmt {
return dataDescription.toSql();
}
})).append(")");
if (brokerDesc != null) {
sb.append("\n").append(brokerDesc.toSql());
}
if (cluster != null) {
sb.append("\nBY '");
sb.append(cluster);
sb.append("'");
}
if (brokerDesc != null) {
sb.append("\n WITH BROKER '").append(brokerDesc.getName()).append("' (");
sb.append(new PrintableMap<String, String>(brokerDesc.getProperties(), "=", true, false, true));
sb.append(")");
if (resourceDesc != null) {
sb.append("\n").append(resourceDesc.toSql());
}
if (properties != null && !properties.isEmpty()) {

View File

@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Resource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.load.EtlJobType;
import com.google.common.collect.Maps;
import java.util.Map;
// Resource descriptor
//
// Spark example:
// WITH RESOURCE "spark0"
// (
// "spark.jars" = "xxx.jar,yyy.jar",
// "spark.files" = "/tmp/aaa,/tmp/bbb",
// "spark.executor.memory" = "1g",
// "spark.yarn.queue" = "queue0"
// )
public class ResourceDesc {
protected String name;
protected Map<String, String> properties;
protected EtlJobType etlJobType;
// Only used for recovery
private ResourceDesc() {
}
public ResourceDesc(String name, Map<String, String> properties) {
this.name = name;
this.properties = properties;
if (this.properties == null) {
this.properties = Maps.newHashMap();
}
this.etlJobType = EtlJobType.UNKNOWN;
}
public String getName() {
return name;
}
public Map<String, String> getProperties() {
return properties;
}
public EtlJobType getEtlJobType() {
return etlJobType;
}
public void analyze() throws AnalysisException {
// check resource exist or not
Resource resource = Catalog.getCurrentCatalog().getResourceMgr().getResource(getName());
if (resource == null) {
throw new AnalysisException("Resource does not exist. name: " + getName());
}
if (resource.getType() == Resource.ResourceType.SPARK) {
etlJobType = EtlJobType.SPARK;
}
}
public String toSql() {
StringBuilder sb = new StringBuilder();
sb.append("WITH RESOURCE '").append(name).append("'");
if (properties != null && !properties.isEmpty()) {
PrintableMap<String, String> printableMap = new PrintableMap<>(properties, " = ", true, false, true);
sb.append(" (").append(printableMap.toString()).append(")");
}
return sb.toString();
}
}

View File

@ -17,7 +17,7 @@
package org.apache.doris.catalog;
//import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
@ -148,7 +148,6 @@ public class SparkResource extends Resource {
return getMaster().equalsIgnoreCase(YARN_MASTER);
}
/*
public void update(ResourceDesc resourceDesc) throws DdlException {
Preconditions.checkState(name.equals(resourceDesc.getName()));
@ -172,7 +171,6 @@ public class SparkResource extends Resource {
}
brokerProperties.putAll(getBrokerProperties(properties));
}
*/
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {

View File

@ -491,6 +491,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int hadoop_load_default_timeout_second = 86400 * 3; // 3 day
/*
* Default spark load timeout
*/
@ConfField(mutable = true, masterOnly = true)
public static int spark_load_default_timeout_second = 86400; // 1 day
/*
* Default number of waiting jobs for routine load and version 2 of load
* This is a desired number.

View File

@ -22,5 +22,7 @@ public enum EtlJobType {
MINI,
INSERT,
BROKER,
DELETE
DELETE,
SPARK,
UNKNOWN
}

View File

@ -17,63 +17,39 @@
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.BrokerFileGroupAggInfo.FileGroupAggKey;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
/**
@ -82,129 +58,22 @@ import java.util.UUID;
* Step2: LoadLoadingTasks will be created by the method of onTaskFinished when BrokerPendingTask is finished.
* Step3: CommitAndPublicTxn will be called by the method of onTaskFinished when all of LoadLoadingTasks are finished.
*/
public class BrokerLoadJob extends LoadJob {
public class BrokerLoadJob extends BulkLoadJob {
private static final Logger LOG = LogManager.getLogger(BrokerLoadJob.class);
// input params
private BrokerDesc brokerDesc;
// this param is used to persist the expr of columns
// the origin stmt is persisted instead of columns expr
// the expr of columns will be reanalyze when the log is replayed
private OriginStatement originStmt;
// include broker desc and data desc
private BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
private List<TabletCommitInfo> commitInfos = Lists.newArrayList();
// sessionVariable's name -> sessionVariable's value
// we persist these sessionVariables due to the session is not available when replaying the job.
private Map<String, String> sessionVariables = Maps.newHashMap();
// only for log replay
public BrokerLoadJob() {
super();
this.jobType = EtlJobType.BROKER;
}
private BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt)
throws MetaNotFoundException {
super(dbId, label);
super(dbId, label, originStmt);
this.timeoutSecond = Config.broker_load_default_timeout_second;
this.brokerDesc = brokerDesc;
this.originStmt = originStmt;
this.jobType = EtlJobType.BROKER;
this.authorizationInfo = gatherAuthInfo();
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
} else {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
}
public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
// get db id
String dbName = stmt.getLabel().getDbName();
Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName());
if (db == null) {
throw new DdlException("Database[" + dbName + "] does not exist");
}
// create job
try {
BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getBrokerDesc(), stmt.getOrigStmt());
brokerLoadJob.setJobProperties(stmt.getProperties());
brokerLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
return brokerLoadJob;
} catch (MetaNotFoundException e) {
throw new DdlException(e.getMessage());
}
}
private void checkAndSetDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
// check data source info
db.readLock();
try {
for (DataDescription dataDescription : dataDescriptions) {
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
fileGroup.parse(db, dataDescription);
fileGroupAggInfo.addFileGroup(fileGroup);
}
} finally {
db.readUnlock();
}
}
private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
return new AuthorizationInfo(database.getFullName(), getTableNames());
}
@Override
public Set<String> getTableNamesForShow() {
Set<String> result = Sets.newHashSet();
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
for (long tableId : fileGroupAggInfo.getAllTableIds()) {
result.add(String.valueOf(tableId));
}
return result;
}
for (long tableId : fileGroupAggInfo.getAllTableIds()) {
Table table = database.getTable(tableId);
if (table == null) {
result.add(String.valueOf(tableId));
} else {
result.add(table.getName());
}
}
return result;
}
@Override
public Set<String> getTableNames() throws MetaNotFoundException{
Set<String> result = Sets.newHashSet();
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
// The database will not be locked in here.
// The getTable is a thread-safe method called without read lock of database
for (long tableId : fileGroupAggInfo.getAllTableIds()) {
Table table = database.getTable(tableId);
if (table == null) {
throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId);
} else {
result.add(table.getName());
}
}
return result;
}
@Override
@ -242,76 +111,6 @@ public class BrokerLoadJob extends LoadJob {
}
}
@Override
public void onTaskFailed(long taskId, FailMsg failMsg) {
writeLock();
try {
// check if job has been completed
if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("state", state)
.add("error_msg", "this task will be ignored when job is: " + state)
.build());
return;
}
LoadTask loadTask = idToTasks.get(taskId);
if (loadTask == null) {
return;
}
if (loadTask.getRetryTime() <= 0) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
return;
} else {
// retry task
idToTasks.remove(loadTask.getSignature());
if (loadTask instanceof LoadLoadingTask) {
loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId());
}
loadTask.updateRetryInfo();
idToTasks.put(loadTask.getSignature(), loadTask);
// load id will be added to loadStatistic when executing this task
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
return;
}
} finally {
writeUnlock();
}
}
/**
* If the db or table could not be found, the Broker load job will be cancelled.
*/
@Override
public void analyze() {
if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) {
return;
}
// Reset dataSourceInfo, it will be re-created in analyze
fileGroupAggInfo = new BrokerFileGroupAggInfo();
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
LoadStmt stmt = null;
try {
stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv();
}
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
throw new DdlException("Database[" + dbId + "] does not exist");
}
checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
} catch (Exception e) {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
.add("origin_stmt", originStmt)
.add("msg", "The failure happens in analyze, the load job will be cancelled with error:"
+ e.getMessage())
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
}
}
/**
* step1: divide job into loading task
* step2: init the plan of task
@ -514,63 +313,4 @@ public class BrokerLoadJob extends LoadJob {
}
return String.valueOf(value);
}
@Override
protected void replayTxnAttachment(TransactionState txnState) {
if (txnState.getTxnCommitAttachment() == null) {
// The txn attachment maybe null when broker load has been cancelled without attachment.
// The end log of broker load has been record but the callback id of txnState hasn't been removed
// So the callback of txn is executed when log of txn aborted is replayed.
return;
}
unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
brokerDesc.write(out);
originStmt.write(out);
out.writeInt(sessionVariables.size());
for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
brokerDesc = BrokerDesc.read(in);
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_61) {
fileGroupAggInfo.readFields(in);
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
String stmt = Text.readString(in);
originStmt = new OriginStatement(stmt, 0);
} else {
originStmt = OriginStatement.read(in);
}
} else {
originStmt = new OriginStatement("", 0);
}
// The origin stmt does not be analyzed in here.
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
// The origin stmt will be analyzed after the replay is completed.
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
sessionVariables.put(key, value);
}
} else {
// old version of load does not have sqlmode, set it to default
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
}
}

View File

@ -0,0 +1,328 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.AuthorizationInfo;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.SqlParserUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.BrokerFileGroupAggInfo;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.SqlModeHelper;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.StringReader;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* parent class of BrokerLoadJob and SparkLoadJob from load stmt
*/
public abstract class BulkLoadJob extends LoadJob {
private static final Logger LOG = LogManager.getLogger(BulkLoadJob.class);
// input params
protected BrokerDesc brokerDesc;
// this param is used to persist the expr of columns
// the origin stmt is persisted instead of columns expr
// the expr of columns will be reanalyze when the log is replayed
private OriginStatement originStmt;
// include broker desc and data desc
protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo();
protected List<TabletCommitInfo> commitInfos = Lists.newArrayList();
// sessionVariable's name -> sessionVariable's value
// we persist these sessionVariables due to the session is not available when replaying the job.
private Map<String, String> sessionVariables = Maps.newHashMap();
// only for log replay
public BulkLoadJob() {
super();
}
public BulkLoadJob(long dbId, String label, OriginStatement originStmt) throws MetaNotFoundException {
super(dbId, label);
this.originStmt = originStmt;
this.authorizationInfo = gatherAuthInfo();
if (ConnectContext.get() != null) {
SessionVariable var = ConnectContext.get().getSessionVariable();
sessionVariables.put(SessionVariable.SQL_MODE, Long.toString(var.getSqlMode()));
} else {
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
}
public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException {
// get db id
String dbName = stmt.getLabel().getDbName();
Database db = Catalog.getCurrentCatalog().getDb(dbName);
if (db == null) {
throw new DdlException("Database[" + dbName + "] does not exist");
}
// create job
BulkLoadJob bulkLoadJob = null;
try {
switch (stmt.getEtlJobType()) {
case BROKER:
bulkLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getBrokerDesc(), stmt.getOrigStmt());
break;
case SPARK:
bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(),
stmt.getResourceDesc(), stmt.getOrigStmt());
break;
case MINI:
case DELETE:
case HADOOP:
case INSERT:
throw new DdlException("LoadManager only support create broker and spark load job from stmt.");
default:
throw new DdlException("Unknown load job type.");
}
bulkLoadJob.setJobProperties(stmt.getProperties());
bulkLoadJob.checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
return bulkLoadJob;
} catch (MetaNotFoundException e) {
throw new DdlException(e.getMessage());
}
}
private void checkAndSetDataSourceInfo(Database db, List<DataDescription> dataDescriptions) throws DdlException {
// check data source info
db.readLock();
try {
for (DataDescription dataDescription : dataDescriptions) {
BrokerFileGroup fileGroup = new BrokerFileGroup(dataDescription);
fileGroup.parse(db, dataDescription);
fileGroupAggInfo.addFileGroup(fileGroup);
}
} finally {
db.readUnlock();
}
}
private AuthorizationInfo gatherAuthInfo() throws MetaNotFoundException {
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
return new AuthorizationInfo(database.getFullName(), getTableNames());
}
@Override
public Set<String> getTableNamesForShow() {
Set<String> result = Sets.newHashSet();
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
for (long tableId : fileGroupAggInfo.getAllTableIds()) {
result.add(String.valueOf(tableId));
}
return result;
}
for (long tableId : fileGroupAggInfo.getAllTableIds()) {
Table table = database.getTable(tableId);
if (table == null) {
result.add(String.valueOf(tableId));
} else {
result.add(table.getName());
}
}
return result;
}
@Override
public Set<String> getTableNames() throws MetaNotFoundException{
Set<String> result = Sets.newHashSet();
Database database = Catalog.getCurrentCatalog().getDb(dbId);
if (database == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
// The database will not be locked in here.
// The getTable is a thread-safe method called without read lock of database
for (long tableId : fileGroupAggInfo.getAllTableIds()) {
Table table = database.getTable(tableId);
if (table == null) {
throw new MetaNotFoundException("Failed to find table " + tableId + " in db " + dbId);
} else {
result.add(table.getName());
}
}
return result;
}
@Override
public void onTaskFailed(long taskId, FailMsg failMsg) {
writeLock();
try {
// check if job has been completed
if (isTxnDone()) {
LOG.warn(new LogBuilder(LogKey.LOAD_JOB, id)
.add("state", state)
.add("error_msg", "this task will be ignored when job is: " + state)
.build());
return;
}
LoadTask loadTask = idToTasks.get(taskId);
if (loadTask == null) {
return;
}
if (loadTask.getRetryTime() <= 0) {
unprotectedExecuteCancel(failMsg, true);
logFinalOperation();
return;
} else {
// retry task
idToTasks.remove(loadTask.getSignature());
if (loadTask instanceof LoadLoadingTask) {
loadStatistic.removeLoad(((LoadLoadingTask) loadTask).getLoadId());
}
loadTask.updateRetryInfo();
idToTasks.put(loadTask.getSignature(), loadTask);
// load id will be added to loadStatistic when executing this task
Catalog.getCurrentCatalog().getLoadTaskScheduler().submit(loadTask);
return;
}
} finally {
writeUnlock();
}
}
/**
* If the db or table could not be found, the Broker load job will be cancelled.
*/
@Override
public void analyze() {
if (originStmt == null || Strings.isNullOrEmpty(originStmt.originStmt)) {
return;
}
// Reset dataSourceInfo, it will be re-created in analyze
fileGroupAggInfo = new BrokerFileGroupAggInfo();
SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt.originStmt),
Long.valueOf(sessionVariables.get(SessionVariable.SQL_MODE))));
LoadStmt stmt = null;
try {
stmt = (LoadStmt) SqlParserUtils.getStmt(parser, originStmt.idx);
for (DataDescription dataDescription : stmt.getDataDescriptions()) {
dataDescription.analyzeWithoutCheckPriv();
}
Database db = Catalog.getCurrentCatalog().getDb(dbId);
if (db == null) {
throw new DdlException("Database[" + dbId + "] does not exist");
}
checkAndSetDataSourceInfo(db, stmt.getDataDescriptions());
} catch (Exception e) {
LOG.info(new LogBuilder(LogKey.LOAD_JOB, id)
.add("origin_stmt", originStmt)
.add("msg", "The failure happens in analyze, the load job will be cancelled with error:"
+ e.getMessage())
.build(), e);
cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false, true);
}
}
@Override
protected void replayTxnAttachment(TransactionState txnState) {
if (txnState.getTxnCommitAttachment() == null) {
// The txn attachment maybe null when broker load has been cancelled without attachment.
// The end log of broker load has been record but the callback id of txnState hasn't been removed
// So the callback of txn is executed when log of txn aborted is replayed.
return;
}
unprotectReadEndOperation((LoadJobFinalOperation) txnState.getTxnCommitAttachment());
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
brokerDesc.write(out);
originStmt.write(out);
out.writeInt(sessionVariables.size());
for (Map.Entry<String, String> entry : sessionVariables.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue());
}
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
brokerDesc = BrokerDesc.read(in);
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_61) {
fileGroupAggInfo.readFields(in);
}
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) {
if (Catalog.getCurrentCatalogJournalVersion() < FeMetaVersion.VERSION_76) {
String stmt = Text.readString(in);
originStmt = new OriginStatement(stmt, 0);
} else {
originStmt = OriginStatement.read(in);
}
} else {
originStmt = new OriginStatement("", 0);
}
// The origin stmt does not be analyzed in here.
// The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName.
// The origin stmt will be analyzed after the replay is completed.
if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) {
int size = in.readInt();
for (int i = 0; i < size; i++) {
String key = Text.readString(in);
String value = Text.readString(in);
sessionVariables.put(key, value);
}
} else {
// old version of load does not have sqlmode, set it to default
sessionVariables.put(SessionVariable.SQL_MODE, String.valueOf(SqlModeHelper.MODE_DEFAULT));
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.doris.load.loadv2;
public enum JobState {
UNKNOWN, // this is only for ISSUE #2354
PENDING, // init state
ETL, // load data partition, sort and aggregation with etl cluster
LOADING, // job is running
COMMITTED, // transaction is committed but not visible
FINISHED, // transaction is visible and job is finished

View File

@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
import static org.apache.doris.load.FailMsg.CancelType.ETL_RUN_FAIL;
import static org.apache.doris.load.FailMsg.CancelType.LOAD_RUN_FAIL;
import org.apache.doris.analysis.CancelLoadStmt;
@ -104,14 +105,14 @@ public class LoadManager implements Writable{
writeLock();
try {
checkLabelUsed(dbId, stmt.getLabel().getLabelName());
if (stmt.getBrokerDesc() == null) {
throw new DdlException("LoadManager only support the broker load.");
if (stmt.getBrokerDesc() == null && stmt.getResourceDesc() == null) {
throw new DdlException("LoadManager only support the broker and spark load.");
}
if (loadJobScheduler.isQueueFull()) {
throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, "
+ "please retry later.");
}
loadJob = BrokerLoadJob.fromLoadStmt(stmt);
loadJob = BulkLoadJob.fromLoadStmt(stmt);
createLoadJob(loadJob);
} finally {
writeUnlock();
@ -529,7 +530,6 @@ public class LoadManager implements Writable{
*
* @param dbId
* @param label
* @param requestId: the uuid of each txn request from BE
* @throws LabelAlreadyUsedException throw exception when label has been used by an unfinished job.
*/
private void checkLabelUsed(long dbId, String label)

View File

@ -0,0 +1,248 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.loadv2;
import com.google.common.base.Strings;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.load.FailMsg;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PushTask;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkAppHandle;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
/**
* There are 4 steps in SparkLoadJob:
* Step1: SparkLoadPendingTask will be created by unprotectedExecuteJob method and submit spark etl job.
* Step2: LoadEtlChecker will check spark etl job status periodly and send push tasks to be when spark etl job is finished.
* Step3: LoadLoadingChecker will check loading status periodly and commit transaction when push tasks are finished.
* Step4: PublishVersionDaemon will send publish version tasks to be and finish transaction.
*/
public class SparkLoadJob extends BulkLoadJob {
private static final Logger LOG = LogManager.getLogger(SparkLoadJob.class);
// for global dict
public static final String BITMAP_DATA_PROPERTY = "bitmap_data";
// --- members below need persist ---
// create from resourceDesc when job created
private SparkResource sparkResource;
// members below updated when job state changed to etl
private long etlStartTimestamp = -1;
// for spark yarn
private String appId = "";
// spark job outputPath
private String etlOutputPath = "";
// members below updated when job state changed to loading
// { tableId.partitionId.indexId.bucket.schemaHash -> (etlFilePath, etlFileSize) }
private Map<String, Pair<String, Long>> tabletMetaToFileInfo = Maps.newHashMap();
// --- members below not persist ---
// temporary use
// one SparkLoadJob has only one table to load
// hivedb.table for global dict
private String hiveTableName = "";
private ResourceDesc resourceDesc;
// for spark standalone
private SparkAppHandle sparkAppHandle;
// for straggler wait long time to commit transaction
private long quorumFinishTimestamp = -1;
// below for push task
private Map<Long, Set<Long>> tableToLoadPartitions = Maps.newHashMap();
//private Map<Long, PushBrokerScannerParams> indexToPushBrokerReaderParams = Maps.newHashMap();
private Map<Long, Integer> indexToSchemaHash = Maps.newHashMap();
private Map<Long, Map<Long, PushTask>> tabletToSentReplicaPushTask = Maps.newHashMap();
private Set<Long> finishedReplicas = Sets.newHashSet();
private Set<Long> quorumTablets = Sets.newHashSet();
private Set<Long> fullTablets = Sets.newHashSet();
// only for log replay
public SparkLoadJob() {
super();
jobType = EtlJobType.SPARK;
}
public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt)
throws MetaNotFoundException {
super(dbId, label, originStmt);
this.resourceDesc = resourceDesc;
timeoutSecond = Config.spark_load_default_timeout_second;
jobType = EtlJobType.SPARK;
}
public String getHiveTableName() {
return hiveTableName;
}
@Override
protected void setJobProperties(Map<String, String> properties) throws DdlException {
super.setJobProperties(properties);
// set spark resource and broker desc
setResourceInfo();
// global dict
if (properties != null) {
if (properties.containsKey(BITMAP_DATA_PROPERTY)) {
hiveTableName = properties.get(BITMAP_DATA_PROPERTY);
}
}
}
/**
* merge system conf with load stmt
* @throws DdlException
*/
private void setResourceInfo() throws DdlException {
// spark resource
String resourceName = resourceDesc.getName();
Resource oriResource = Catalog.getCurrentCatalog().getResourceMgr().getResource(resourceName);
if (oriResource == null) {
throw new DdlException("Resource does not exist. name: " + resourceName);
}
sparkResource = ((SparkResource) oriResource).getCopiedResource();
sparkResource.update(resourceDesc);
// broker desc
Map<String, String> brokerProperties = sparkResource.getBrokerPropertiesWithoutPrefix();
brokerDesc = new BrokerDesc(sparkResource.getBroker(), brokerProperties);
}
/**
* load job already cancelled or finished, clear job below:
* 1. kill etl job and delete etl files
* 2. clear push tasks and infos that not persist
*/
private void clearJob() {
Preconditions.checkState(state == JobState.FINISHED || state == JobState.CANCELLED);
LOG.debug("kill etl job and delete etl files. id: {}, state: {}", id, state);
// TODO(wyb): spark-load
//SparkEtlJobHandler handler = new SparkEtlJobHandler();
if (state == JobState.CANCELLED) {
if ((!Strings.isNullOrEmpty(appId) && sparkResource.isYarnMaster()) || sparkAppHandle != null) {
try {
// TODO(wyb): spark-load
//handler.killEtlJob(sparkAppHandle, appId, id, sparkResource);
} catch (Exception e) {
LOG.warn("kill etl job failed. id: {}, state: {}", id, state, e);
}
}
}
if (!Strings.isNullOrEmpty(etlOutputPath)) {
try {
// delete label dir, remove the last taskId dir
String outputPath = etlOutputPath.substring(0, etlOutputPath.lastIndexOf("/"));
// TODO(wyb): spark-load
//handler.deleteEtlOutputPath(outputPath, brokerDesc);
} catch (Exception e) {
LOG.warn("delete etl files failed. id: {}, state: {}", id, state, e);
}
}
LOG.debug("clear push tasks and infos that not persist. id: {}, state: {}", id, state);
writeLock();
try {
// clear push task first
for (Map<Long, PushTask> sentReplicaPushTask : tabletToSentReplicaPushTask.values()) {
for (PushTask pushTask : sentReplicaPushTask.values()) {
if (pushTask == null) {
continue;
}
AgentTaskQueue.removeTask(pushTask.getBackendId(), pushTask.getTaskType(), pushTask.getSignature());
}
}
// clear job infos that not persist
hiveTableName = "";
sparkAppHandle = null;
resourceDesc = null;
tableToLoadPartitions.clear();
//indexToPushBrokerReaderParams.clear();
indexToSchemaHash.clear();
tabletToSentReplicaPushTask.clear();
finishedReplicas.clear();
quorumTablets.clear();
fullTablets.clear();
} finally {
writeUnlock();
}
}
@Override
public void cancelJobWithoutCheck(FailMsg failMsg, boolean abortTxn, boolean needLog) {
super.cancelJobWithoutCheck(failMsg, abortTxn, needLog);
clearJob();
}
@Override
public void cancelJob(FailMsg failMsg) throws DdlException {
super.cancelJob(failMsg);
clearJob();
}
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
sparkResource.write(out);
out.writeLong(etlStartTimestamp);
Text.writeString(out, appId);
Text.writeString(out, etlOutputPath);
out.writeInt(tabletMetaToFileInfo.size());
for (Map.Entry<String, Pair<String, Long>> entry : tabletMetaToFileInfo.entrySet()) {
Text.writeString(out, entry.getKey());
Text.writeString(out, entry.getValue().first);
out.writeLong(entry.getValue().second);
}
}
public void readFields(DataInput in) throws IOException {
super.readFields(in);
sparkResource = (SparkResource) Resource.read(in);
etlStartTimestamp = in.readLong();
appId = Text.readString(in);
etlOutputPath = Text.readString(in);
int size = in.readInt();
for (int i = 0; i < size; i++) {
String tabletMetaStr = Text.readString(in);
Pair<String, Long> fileInfo = Pair.create(Text.readString(in), in.readLong());
tabletMetaToFileInfo.put(tabletMetaStr, fileInfo);
}
}
}

View File

@ -118,15 +118,13 @@ public class DdlExecutor {
catalog.cancelAlter((CancelAlterTableStmt) ddlStmt);
} else if (ddlStmt instanceof LoadStmt) {
LoadStmt loadStmt = (LoadStmt) ddlStmt;
EtlJobType jobType;
if (loadStmt.getBrokerDesc() != null) {
jobType = EtlJobType.BROKER;
} else {
if (Config.disable_hadoop_load) {
throw new DdlException("Load job by hadoop cluster is disabled."
+ " Try using broker load. See 'help broker load;'");
}
jobType = EtlJobType.HADOOP;
EtlJobType jobType = loadStmt.getEtlJobType();
if (jobType == EtlJobType.UNKNOWN) {
throw new DdlException("Unknown load job type");
}
if (jobType == EtlJobType.HADOOP && Config.disable_hadoop_load) {
throw new DdlException("Load job by hadoop cluster is disabled."
+ " Try using broker load. See 'help broker load;'");
}
if (loadStmt.getVersion().equals(Load.VERSION) || jobType == EtlJobType.HADOOP) {
catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis());

View File

@ -17,9 +17,14 @@
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ResourceMgr;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.EtlJobType;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
@ -68,15 +73,26 @@ public class LoadStmtTest {
}
@Test
public void testNormal(@Injectable DataDescription desc) throws UserException, AnalysisException {
public void testNormal(@Injectable DataDescription desc, @Mocked Catalog catalog,
@Injectable ResourceMgr resourceMgr, @Injectable PaloAuth auth) throws UserException, AnalysisException {
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(desc);
String resourceName = "spark0";
SparkResource resource = new SparkResource(resourceName);
new Expectations(){
{
desc.toSql();
minTimes = 0;
result = "XXX";
catalog.getResourceMgr();
result = resourceMgr;
resourceMgr.getResource(resourceName);
result = resource;
catalog.getAuth();
result = auth;
auth.checkResourcePriv((ConnectContext) any, resourceName, PrivPredicate.USAGE);
result = true;
}
};
@ -88,6 +104,16 @@ public class LoadStmtTest {
Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n"
+ "(XXX)", stmt.toString());
// test ResourceDesc
// TODO(wyb): spark-load
LoadStmt.disableSparkLoad = false;
stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList,
new ResourceDesc(resourceName, null), null);
stmt.analyze(analyzer);
Assert.assertEquals(EtlJobType.SPARK, stmt.getResourceDesc().getEtlJobType());
Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n(XXX)\nWITH RESOURCE 'spark0'",
stmt.toString());
}
@Test(expected = AnalysisException.class)

View File

@ -0,0 +1,80 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.analysis;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.ResourceMgr;
import org.apache.doris.catalog.SparkResource;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.load.EtlJobType;
import com.google.common.collect.Maps;
import org.junit.Assert;
import org.junit.Test;
import mockit.Expectations;
import mockit.Injectable;
import mockit.Mocked;
import java.util.Map;
public class ResourceDescTest {
@Test
public void testNormal(@Mocked Catalog catalog, @Injectable ResourceMgr resourceMgr)
throws AnalysisException, DdlException {
String resourceName = "spark0";
Map<String, String> properties = Maps.newHashMap();
String key = "spark.executor.memory";
String value = "2g";
properties.put(key, value);
ResourceDesc resourceDesc = new ResourceDesc(resourceName, properties);
SparkResource resource = new SparkResource(resourceName);
new Expectations() {
{
catalog.getResourceMgr();
result = resourceMgr;
resourceMgr.getResource(resourceName);
result = resource;
}
};
resourceDesc.analyze();
Assert.assertEquals(resourceName, resourceDesc.getName());
Assert.assertEquals(value, resourceDesc.getProperties().get(key));
Assert.assertEquals(EtlJobType.SPARK, resourceDesc.getEtlJobType());
}
@Test(expected = AnalysisException.class)
public void testNoResource(@Mocked Catalog catalog, @Injectable ResourceMgr resourceMgr) throws AnalysisException {
String resourceName = "spark1";
ResourceDesc resourceDesc = new ResourceDesc(resourceName, null);
new Expectations() {
{
catalog.getResourceMgr();
result = resourceMgr;
resourceMgr.getResource(resourceName);
result = null;
}
};
resourceDesc.analyze();
}
}

View File

@ -20,7 +20,7 @@ package org.apache.doris.catalog;
import org.apache.doris.analysis.AccessTestUtil;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateResourceStmt;
//import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.analysis.ResourceDesc;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.proc.BaseProcResult;
@ -119,7 +119,6 @@ public class SparkResourceTest {
Assert.assertEquals(9, result.getRows().size());
}
/*
@Test
public void testUpdate(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)
throws UserException {
@ -156,7 +155,6 @@ public class SparkResourceTest {
Assert.assertEquals(6, map.size());
Assert.assertEquals("2g", copiedResource.getSparkConfigs().get("spark.driver.memory"));
}
*/
@Test(expected = DdlException.class)
public void testNoBroker(@Injectable BrokerMgr brokerMgr, @Mocked Catalog catalog, @Injectable PaloAuth auth)

View File

@ -101,7 +101,7 @@ public class BrokerLoadJobTest {
};
try {
BrokerLoadJob.fromLoadStmt(loadStmt);
BulkLoadJob.fromLoadStmt(loadStmt);
Assert.fail();
} catch (DdlException e) {
System.out.println("could not find table named " + tableName);
@ -123,6 +123,7 @@ public class BrokerLoadJobTest {
String databaseName = "database";
List<DataDescription> dataDescriptionList = Lists.newArrayList();
dataDescriptionList.add(dataDescription);
BrokerDesc brokerDesc = new BrokerDesc("broker0", Maps.newHashMap());
new Expectations() {
{
@ -153,6 +154,12 @@ public class BrokerLoadJobTest {
database.getId();
minTimes = 0;
result = dbId;
loadStmt.getBrokerDesc();
minTimes = 0;
result = brokerDesc;
loadStmt.getEtlJobType();
minTimes = 0;
result = EtlJobType.BROKER;
}
};
@ -165,7 +172,7 @@ public class BrokerLoadJobTest {
};
try {
BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt);
BrokerLoadJob brokerLoadJob = (BrokerLoadJob) BulkLoadJob.fromLoadStmt(loadStmt);
Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId"));
Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label"));
Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state"));