[Refactor] Remove redundant code of mini load and insert (#4966)

The content deleted by this PR includes mini load and insert in the old framework.
The task and scheduling logic of old framework has been deleted.
This commit is contained in:
EmmyMiao87
2021-02-03 22:19:20 +08:00
committed by GitHub
parent 128752b4f9
commit 77b756fb87
14 changed files with 20 additions and 1687 deletions

View File

@ -206,7 +206,6 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.CreateReplicaTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PullLoadJobMgr;
import org.apache.doris.thrift.TStorageFormat;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.thrift.TStorageType;
@ -367,7 +366,6 @@ public class Catalog {
private MetaReplayState metaReplayState;
private PullLoadJobMgr pullLoadJobMgr;
private BrokerMgr brokerMgr;
private ResourceMgr resourceMgr;
@ -520,7 +518,6 @@ public class Catalog {
this.isDefaultClusterCreated = false;
this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog);
this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();
@ -589,10 +586,6 @@ public class Catalog {
return SingletonHolder.INSTANCE;
}
public PullLoadJobMgr getPullLoadJobMgr() {
return pullLoadJobMgr;
}
public BrokerMgr getBrokerMgr() {
return brokerMgr;
}

View File

@ -595,7 +595,7 @@ public class Load {
* This is only used for hadoop load
*/
public static void checkAndCreateSource(Database db, DataDescription dataDescription,
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, EtlJobType jobType) throws DdlException {
Map<Long, Map<Long, List<Source>>> tableToPartitionSources, EtlJobType jobType) throws DdlException {
Source source = new Source(dataDescription.getFilePaths());
long tableId = -1;
Set<Long> sourcePartitionIds = Sets.newHashSet();
@ -779,7 +779,7 @@ public class Load {
Pair<String, List<String>> function = entry.getValue();
try {
DataDescription.validateMappingFunction(function.first, function.second, columnNameMap,
mappingColumn, dataDescription.isHadoopLoad());
mappingColumn, dataDescription.isHadoopLoad());
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
@ -883,7 +883,7 @@ public class Load {
* (A, B, C) SET (__doris_shadow_B = B)
*/
ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(),
new SlotRef(null, originCol));
new SlotRef(null, originCol));
shadowColumnDescs.add(importColumnDesc);
}
} else {
@ -918,7 +918,7 @@ public class Load {
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
rewriteColumns(columnExprs);
initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer,
srcTupleDesc, slotDescByName, params, true);
srcTupleDesc, slotDescByName, params, true);
}
/*
@ -946,7 +946,7 @@ public class Load {
}
// check whether the OlapTable has sequenceCol
boolean hasSequenceCol = false;
if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) {
if (tbl instanceof OlapTable && ((OlapTable) tbl).hasSequenceCol()) {
hasSequenceCol = true;
}
@ -1771,12 +1771,7 @@ public class Load {
clearJob(job, srcState);
}
if (job.getBrokerDesc() != null) {
if (srcState == JobState.ETL) {
// Cancel job id
Catalog.getCurrentCatalog().getPullLoadJobMgr().cancelJob(job.getId());
}
}
Preconditions.checkState(job.getBrokerDesc() == null);
LOG.info("cancel load job success. job: {}", job);
return true;
}
@ -1925,14 +1920,14 @@ public class Load {
if (tableNames.isEmpty()) {
// forward compatibility
if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName,
PrivPredicate.LOAD)) {
PrivPredicate.LOAD)) {
continue;
}
} else {
boolean auth = true;
for (String tblName : tableNames) {
if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName,
tblName, PrivPredicate.LOAD)) {
tblName, PrivPredicate.LOAD)) {
auth = false;
break;
}
@ -2004,8 +1999,8 @@ public class Load {
// task info
jobInfo.add("cluster:" + loadJob.getHadoopCluster()
+ "; timeout(s):" + loadJob.getTimeoutSecond()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
+ "; timeout(s):" + loadJob.getTimeoutSecond()
+ "; max_filter_ratio:" + loadJob.getMaxFilterRatio());
// error msg
if (loadJob.getState() == JobState.CANCELLED) {
@ -2330,7 +2325,7 @@ public class Load {
continue;
}
replica.updateVersionInfo(info.getVersion(), info.getVersionHash(),
info.getDataSize(), info.getRowCount());
info.getDataSize(), info.getRowCount());
}
}
@ -2349,7 +2344,7 @@ public class Load {
continue;
}
updatePartitionVersion(partition, partitionLoadInfo.getVersion(),
partitionLoadInfo.getVersionHash(), jobId);
partitionLoadInfo.getVersionHash(), jobId);
// update table row count
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
@ -2446,7 +2441,7 @@ public class Load {
continue;
}
replica.updateVersionInfo(info.getVersion(), info.getVersionHash(),
info.getDataSize(), info.getRowCount());
info.getDataSize(), info.getRowCount());
}
}
} else {
@ -2652,7 +2647,7 @@ public class Load {
// hdfs://host:port/outputPath/dbId/loadLabel/
DppConfig dppConfig = job.getHadoopDppConfig();
String outputPath = DppScheduler.getEtlOutputPath(dppConfig.getFsDefaultName(),
dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), "");
dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), "");
try {
dppScheduler.deleteEtlOutputPath(outputPath);
} catch (Exception e) {
@ -2696,7 +2691,7 @@ public class Load {
}
public boolean updateLoadJobState(LoadJob job, JobState destState, CancelType cancelType, String msg,
List<String> failedMsg) {
List<String> failedMsg) {
boolean result = true;
JobState srcState = null;
@ -2855,7 +2850,7 @@ public class Load {
}
updatePartitionVersion(partition, partitionLoadInfo.getVersion(),
partitionLoadInfo.getVersionHash(), jobId);
partitionLoadInfo.getVersionHash(), jobId);
for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.ALL)) {
long tableRowCount = 0L;
@ -2889,7 +2884,7 @@ public class Load {
long partitionId = partition.getId();
partition.updateVisibleVersionAndVersionHash(version, versionHash);
LOG.info("update partition version success. version: {}, version hash: {}, job id: {}, partition id: {}",
version, versionHash, jobId, partitionId);
version, versionHash, jobId, partitionId);
}
private boolean processCancelled(LoadJob job, CancelType cancelType, String msg, List<String> failedMsg) {
@ -2961,8 +2956,8 @@ public class Load {
if (srcState == JobState.LOADING || srcState == JobState.QUORUM_FINISHED) {
for (PushTask pushTask : job.getPushTasks()) {
AgentTaskQueue.removePushTask(pushTask.getBackendId(), pushTask.getSignature(),
pushTask.getVersion(), pushTask.getVersionHash(),
pushTask.getPushType(), pushTask.getTaskType());
pushTask.getVersion(), pushTask.getVersionHash(),
pushTask.getPushType(), pushTask.getTaskType());
}
}

View File

@ -39,11 +39,8 @@ import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.HadoopLoadEtlTask;
import org.apache.doris.task.HadoopLoadPendingTask;
import org.apache.doris.task.InsertLoadEtlTask;
import org.apache.doris.task.MasterTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.MiniLoadEtlTask;
import org.apache.doris.task.MiniLoadPendingTask;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.thrift.TPushType;
@ -172,9 +169,6 @@ public class LoadChecker extends MasterDaemon {
case HADOOP:
task = new HadoopLoadPendingTask(job);
break;
case MINI:
task = new MiniLoadPendingTask(job);
break;
default:
LOG.warn("unknown etl job type. type: {}", etlJobType.name());
break;
@ -200,12 +194,6 @@ public class LoadChecker extends MasterDaemon {
case HADOOP:
task = new HadoopLoadEtlTask(job);
break;
case MINI:
task = new MiniLoadEtlTask(job);
break;
case INSERT:
task = new InsertLoadEtlTask(job);
break;
default:
LOG.warn("unknown etl job type. type: {}", etlJobType.name());
break;

View File

@ -1,208 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.PartitionLoadInfo;
import org.apache.doris.load.Source;
import org.apache.doris.load.TableLoadInfo;
import org.apache.doris.thrift.TColumnType;
import org.apache.doris.thrift.TCsvScanNode;
import org.apache.doris.thrift.TMiniLoadEtlFunction;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.base.MoreObjects;
import com.google.common.base.MoreObjects.ToStringHelper;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
public class CsvScanNode extends ScanNode {
private static final Logger LOG = LogManager.getLogger(CsvScanNode.class);
private final OlapTable table;
private final LoadJob job;
private List<String> filePaths = Lists.newArrayList();
private String columnSeparator;
private String lineDelimiter;
private List<String> columns = Lists.newArrayList();
private List<String> unspecifiedColumns = Lists.newArrayList();
private List<String> defaultValues = Lists.newArrayList();
private Map<String, TColumnType> columnTypeMapping = Maps.newHashMap();
private Map<String, TMiniLoadEtlFunction> columnToFunction = Maps.newHashMap();
private double maxFilterRatio = 0.0;
public CsvScanNode(PlanNodeId id, TupleDescriptor desc, OlapTable table, LoadJob job) {
super(id, desc, "Scan CSV");
this.table = table;
this.job = job;
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.CSV_SCAN_NODE;
msg.csv_scan_node = new TCsvScanNode(desc.getId().asInt(), filePaths);
if (!Strings.isNullOrEmpty(columnSeparator)) {
msg.csv_scan_node.setColumnSeparator(columnSeparator);
}
if (!Strings.isNullOrEmpty(lineDelimiter)) {
msg.csv_scan_node.setLineDelimiter(lineDelimiter);
}
if (!columns.isEmpty()) {
msg.csv_scan_node.setColumns(columns);
}
if (!unspecifiedColumns.isEmpty()) {
msg.csv_scan_node.setUnspecifiedColumns(unspecifiedColumns);
}
if (!defaultValues.isEmpty()) {
msg.csv_scan_node.setDefaultValues(defaultValues);
}
if (!columnToFunction.isEmpty()) {
msg.csv_scan_node.setColumnFunctionMapping(columnToFunction);
}
msg.csv_scan_node.setColumnTypeMapping(columnTypeMapping);
msg.csv_scan_node.setMaxFilterRatio(maxFilterRatio);
msg.csv_scan_node.setColumnSeparator(columnSeparator);
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
// get file paths
// file paths in different partitions are same in mini load
TableLoadInfo tableLoadInfo = job.getTableLoadInfo(table.getId());
Collection<PartitionLoadInfo> partitionLoadInfos = tableLoadInfo.getIdToPartitionLoadInfo().values();
Preconditions.checkState(!partitionLoadInfos.isEmpty());
PartitionLoadInfo partitionLoadInfo = (PartitionLoadInfo) partitionLoadInfos.toArray()[0];
List<Source> sources = partitionLoadInfo.getSources();
Preconditions.checkState(sources.size() == 1);
Source source = sources.get(0);
filePaths = source.getFileUrls();
// column separator
columnSeparator = source.getColumnSeparator();
// line delimiter
lineDelimiter = source.getLineDelimiter();
// construct columns (specified & unspecified) and default-values
List<String> columnNames = Lists.newArrayList();
for (Column column : table.getBaseSchema()) {
columnNames.add(column.getName());
}
columns = source.getColumnNames();
if (columns.isEmpty()) {
columns = columnNames;
}
for (String columnName : columns) {
if (!columnNames.contains(columnName)) {
LOG.info("Column [{}] is not exist in table schema, will be ignored.", columnName);
}
}
for (String columnName : columnNames) {
Column column = table.getColumn(columnName);
columnTypeMapping.put(columnName, column.getOriginType().toColumnTypeThrift());
if (columns.contains(columnName)) {
continue;
}
unspecifiedColumns.add(columnName);
String defaultValue = column.getDefaultValue();
if (defaultValue == null && false == column.isAllowNull()) {
throw new UserException(
"Column [" + columnName + "] should be specified. "
+ "only columns have default values can be omitted");
}
if (true == column.isAllowNull() && null == defaultValue) {
defaultValues.add("\\N");
} else {
defaultValues.add(defaultValue);
}
}
Map<String, Pair<String, List<String>>> functions = source.getColumnToFunction();
for (String key : functions.keySet()) {
final Pair<String, List<String>> pair = functions.get(key);
TMiniLoadEtlFunction function = new TMiniLoadEtlFunction();
int paramColumnIndex = -1;
for (String str : pair.second) {
boolean find = false;
for (int i = 0; i < columns.size(); i++) {
if (str.equals(columns.get(i))) {
paramColumnIndex = i;
find = true;
break;
}
}
if (find) {
function.setFunctionName(pair.first);
function.setParamColumnIndex(paramColumnIndex);
columnToFunction.put(key, function);
break;
}
}
}
// max filter ratio
// TODO: remove!!
maxFilterRatio = job.getMaxFilterRatio();
}
@Override
protected String debugString() {
ToStringHelper helper = MoreObjects.toStringHelper(this);
return helper.addValue(super.debugString()).toString();
}
/**
* like Mysql, We query Meta to get request's data location
* extra result info will pass to backend ScanNode
*/
@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return null;
}
@Override
public int getNumInstances() {
return 1;
}
}

View File

@ -205,9 +205,7 @@ public class DistributedPlanner {
childFragments.get(0));
} else if (root instanceof SelectNode) {
result = createSelectNodeFragment((SelectNode) root, childFragments);
} else if (root instanceof OlapRewriteNode) {
result = createOlapRewriteNodeFragment((OlapRewriteNode) root, childFragments);
} else if (root instanceof SetOperationNode) {
} else if (root instanceof SetOperationNode) {
result = createSetOperationNodeFragment((SetOperationNode) root, childFragments, fragments);
} else if (root instanceof MergeNode) {
result = createMergeNodeFragment((MergeNode) root, childFragments, fragments);
@ -838,15 +836,6 @@ public class DistributedPlanner {
return childFragment;
}
private PlanFragment createOlapRewriteNodeFragment(
OlapRewriteNode olapRewriteNode, ArrayList<PlanFragment> childFragments) {
Preconditions.checkState(olapRewriteNode.getChildren().size() == childFragments.size());
PlanFragment childFragment = childFragments.get(0);
olapRewriteNode.setChild(0, childFragment.getPlanRoot());
childFragment.setPlanRoot(olapRewriteNode);
return childFragment;
}
/**
* Replace node's child at index childIdx with an ExchangeNode that receives its input from childFragment.
*/

View File

@ -1,126 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import com.google.common.collect.Lists;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.InsertStmt;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TOlapRewriteNode;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
// Used to convert column to valid OLAP table
public class OlapRewriteNode extends PlanNode {
private static final Logger LOG = LogManager.getLogger(OlapRewriteNode.class);
private InsertStmt insertStmt;
private Table table;
private TupleDescriptor tupleDescriptor;
private List<Expr> newResultExprs;
public OlapRewriteNode(PlanNodeId id, PlanNode child, InsertStmt insertStmt) {
super(id, insertStmt.getOlapTuple().getId().asList(), "OLAP REWRITE NODE");
addChild(child);
this.table = insertStmt.getTargetTable();
this.tupleDescriptor = insertStmt.getOlapTuple();
this.insertStmt = insertStmt;
}
public OlapRewriteNode(PlanNodeId id, PlanNode child,
Table table,
TupleDescriptor tupleDescriptor,
List<Expr> slotRefs) {
super(id, child.getTupleIds(), "OLAP REWRITE NODE");
addChild(child);
this.table = table;
this.tupleDescriptor = tupleDescriptor;
this.newResultExprs = slotRefs;
}
@Override
public void init(Analyzer analyzer) throws UserException {
assignConjuncts(analyzer);
// Set smap to the combined children's smaps and apply that to all conjuncts_.
createDefaultSmap(analyzer);
computeStats(analyzer);
// assignedConjuncts = analyzr.getAssignedConjuncts();
if (insertStmt != null) {
ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap();
newResultExprs = Lists.newArrayList();
for (Expr expr : insertStmt.getResultExprs()) {
newResultExprs.add(expr.clone(combinedChildSmap));
}
}
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.OLAP_REWRITE_NODE;
TOlapRewriteNode tnode = new TOlapRewriteNode();
for (Column column : table.getBaseSchema()) {
tnode.addToColumnTypes(column.getOriginType().toColumnTypeThrift());
}
for (Expr expr : newResultExprs) {
tnode.addToColumns(expr.treeToThrift());
}
tnode.setOutputTupleId(tupleDescriptor.getId().asInt());
msg.setOlapRewriteNode(tnode);
}
@Override
public void computeStats(Analyzer analyzer) {
super.computeStats(analyzer);
long cardinality = getChild(0).cardinality;
double selectivity = computeSelectivity();
if (cardinality < 0 || selectivity < 0) {
this.cardinality = -1;
} else {
this.cardinality = Math.round(cardinality * selectivity);
}
}
@Override
protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
if (!conjuncts.isEmpty()) {
output.append(prefix + "predicates: " + getExplainString(conjuncts) + "\n");
}
return output.toString();
}
@Override
public int getNumInstances() {
return children.get(0).getNumInstances();
}
}

View File

@ -1,38 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.common.LoadException;
import org.apache.doris.load.LoadJob;
public class InsertLoadEtlTask extends MiniLoadEtlTask {
public InsertLoadEtlTask(LoadJob job) {
super(job);
}
@Override
protected boolean updateJobEtlStatus() {
return true;
}
@Override
protected void processEtlRunning() throws LoadException {
throw new LoadException("not implement");
}
}

View File

@ -1,198 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.load.MiniEtlTaskInfo;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TMiniLoadEtlStatusResult;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.util.Map;
public class MiniLoadEtlTask extends LoadEtlTask {
private static final Logger LOG = LogManager.getLogger(MiniLoadEtlTask.class);
public MiniLoadEtlTask(LoadJob job) {
super(job);
}
@Override
protected boolean updateJobEtlStatus() {
// update etl tasks status
if (job.miniNeedGetTaskStatus()) {
LOG.debug("get mini etl task status actively. job: {}", job);
for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
TEtlState etlState = taskInfo.getTaskStatus().getState();
if (etlState == TEtlState.RUNNING) {
updateEtlTaskStatus(taskInfo);
}
}
}
// update etl job status
updateEtlJobStatus();
return true;
}
private boolean updateEtlTaskStatus(MiniEtlTaskInfo taskInfo) {
// get etl status
TMiniLoadEtlStatusResult result = getMiniLoadEtlStatus(taskInfo.getBackendId(), taskInfo.getId());
LOG.info("mini load etl status: {}, job: {}", result, job);
if (result == null) {
return false;
}
TStatus tStatus = result.getStatus();
if (tStatus.getStatusCode() != TStatusCode.OK) {
LOG.warn("get buck load etl status fail. msg: {}, job: {}", tStatus.getErrorMsgs(), job);
return false;
}
// update etl task status
EtlStatus taskStatus = taskInfo.getTaskStatus();
if (taskStatus.setState(result.getEtlState())) {
if (result.isSetCounters()) {
taskStatus.setCounters(result.getCounters());
}
if (result.isSetTrackingUrl()) {
taskStatus.setTrackingUrl(result.getTrackingUrl());
}
if (result.isSetFileMap()) {
taskStatus.setFileMap(result.getFileMap());
}
}
return true;
}
private TMiniLoadEtlStatusResult getMiniLoadEtlStatus(long backendId, long taskId) {
TMiniLoadEtlStatusResult result = null;
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
if (backend == null || !backend.isAlive()) {
String failMsg = "backend is null or is not alive";
LOG.error(failMsg);
return result;
}
AgentClient client = new AgentClient(backend.getHost(), backend.getBePort());
return client.getEtlStatus(job.getId(), taskId);
}
private boolean updateEtlJobStatus() {
boolean hasCancelledTask = false;
boolean hasRunningTask = false;
long normalNum = 0;
long abnormalNum = 0;
Map<String, Long> fileMap = Maps.newHashMap();
String trackingUrl = EtlStatus.DEFAULT_TRACKING_URL;
EtlStatus etlJobStatus = job.getEtlJobStatus();
for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
EtlStatus taskStatus = taskInfo.getTaskStatus();
switch (taskStatus.getState()) {
case RUNNING:
hasRunningTask = true;
break;
case CANCELLED:
hasCancelledTask = true;
break;
case FINISHED:
// counters and file list
Map<String, String> counters = taskStatus.getCounters();
if (counters.containsKey(DPP_NORMAL_ALL)) {
normalNum += Long.parseLong(counters.get(DPP_NORMAL_ALL));
}
if (counters.containsKey(DPP_ABNORMAL_ALL)) {
abnormalNum += Long.parseLong(counters.get(DPP_ABNORMAL_ALL));
}
fileMap.putAll(taskStatus.getFileMap());
if (!taskStatus.getTrackingUrl().equals(EtlStatus.DEFAULT_TRACKING_URL)) {
trackingUrl = taskStatus.getTrackingUrl();
}
break;
default:
break;
}
}
if (hasCancelledTask) {
etlJobStatus.setState(TEtlState.CANCELLED);
} else if (hasRunningTask) {
etlJobStatus.setState(TEtlState.RUNNING);
} else {
etlJobStatus.setState(TEtlState.FINISHED);
Map<String, String> counters = Maps.newHashMap();
counters.put(DPP_NORMAL_ALL, String.valueOf(normalNum));
counters.put(DPP_ABNORMAL_ALL, String.valueOf(abnormalNum));
etlJobStatus.setCounters(counters);
etlJobStatus.setFileMap(fileMap);
etlJobStatus.setTrackingUrl(trackingUrl);
}
return true;
}
@Override
protected void processEtlRunning() throws LoadException {
// update mini etl job progress
int finishedTaskNum = 0;
Map<Long, MiniEtlTaskInfo> idToEtlTask = job.getMiniEtlTasks();
for (MiniEtlTaskInfo taskInfo : idToEtlTask.values()) {
EtlStatus taskStatus = taskInfo.getTaskStatus();
if (taskStatus.getState() == TEtlState.FINISHED) {
++finishedTaskNum;
}
}
int progress = (int) (finishedTaskNum * 100 / idToEtlTask.size());
if (progress >= 100) {
// set progress to 100 when status is FINISHED
progress = 99;
}
job.setProgress(progress);
}
@Override
protected Map<String, Pair<String, Long>> getFilePathMap() throws LoadException {
Map<String, Long> fileMap = job.getEtlJobStatus().getFileMap();
if (fileMap == null) {
throw new LoadException("get etl files error");
}
Map<String, Pair<String, Long>> filePathMap = Maps.newHashMap();
for (Map.Entry<String, Long> entry : fileMap.entrySet()) {
String partitionIndexBucket = getPartitionIndexBucketString(entry.getKey());
// http://host:8000/data/dir/file
filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(), entry.getValue()));
}
return filePathMap;
}
}

View File

@ -1,216 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.load.EtlSubmitResult;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
import org.apache.doris.load.TableLoadInfo;
import org.apache.doris.planner.CsvScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSplitSink;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.PaloInternalServiceVersion;
import org.apache.doris.thrift.TAgentResult;
import org.apache.doris.thrift.TAgentServiceVersion;
import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TLoadErrorHubInfo;
import org.apache.doris.thrift.TMiniLoadEtlTaskRequest;
import org.apache.doris.thrift.TPlanFragmentExecParams;
import org.apache.doris.thrift.TQueryOptions;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
public class MiniLoadPendingTask extends LoadPendingTask {
private static final Logger LOG = LogManager.getLogger(MiniLoadPendingTask.class);
// descriptor used to register all column and table need
private final DescriptorTable desc;
// destination Db and table get from request
// Data will load to this table
private OlapTable destTable;
// dest desc
private TupleDescriptor destTupleDesc;
private DataSplitSink tableSink;
private List<Pair<Long, TMiniLoadEtlTaskRequest>> requests;
public MiniLoadPendingTask(LoadJob job) {
super(job);
this.desc = new DescriptorTable();
}
@Override
protected void createEtlRequest() throws Exception {
requests = Lists.newArrayList();
for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) {
long taskId = taskInfo.getId();
long backendId = taskInfo.getBackendId();
long tableId = taskInfo.getTableId();
// All the following operation will process when destTable's read lock are hold.
destTable = (OlapTable) db.getTable(tableId);
if (destTable == null) {
throw new LoadException("table does not exist. id: " + tableId);
}
destTable.readLock();
try {
registerToDesc();
tableSink = new DataSplitSink(destTable, destTupleDesc);
// add schema hash to table load info
TableLoadInfo tableLoadInfo = job.getTableLoadInfo(destTable.getId());
for (Map.Entry<Long, Integer> entry : destTable.getIndexIdToSchemaHash().entrySet()) {
tableLoadInfo.addIndexSchemaHash(entry.getKey(), entry.getValue());
}
requests.add(new Pair<Long, TMiniLoadEtlTaskRequest>(backendId, createRequest(taskId)));
} finally {
destTable.readUnlock();
}
}
}
@Override
protected EtlSubmitResult submitEtlJob(int retry) {
LOG.info("begin submit mini load etl job: {}", job);
for (Pair<Long, TMiniLoadEtlTaskRequest> pair : requests) {
long backendId = pair.first;
TMiniLoadEtlTaskRequest request = pair.second;
Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId);
if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) {
String failMsg = "backend is null or is not alive";
LOG.error(failMsg);
TStatus tStatus = new TStatus(TStatusCode.CANCELLED);
tStatus.setErrorMsgs(Lists.newArrayList(failMsg));
return new EtlSubmitResult(tStatus, null);
}
AgentClient client = new AgentClient(backend.getHost(), backend.getBePort());
TAgentResult submitResult = client.submitEtlTask(request);
if (submitResult.getStatus().getStatusCode() != TStatusCode.OK) {
return new EtlSubmitResult(submitResult.getStatus(), null);
}
}
return new EtlSubmitResult(new TStatus(TStatusCode.OK), null);
}
private void registerToDesc() {
destTupleDesc = desc.createTupleDescriptor();
destTupleDesc.setTable(destTable);
// Lock database and get its schema hash??
// Make sure that import job has its corresponding schema
for (Column col : destTable.getBaseSchema()) {
SlotDescriptor slot = desc.addSlotDescriptor(destTupleDesc);
// All this slot is needed
slot.setIsMaterialized(true);
slot.setColumn(col);
if (true == col.isAllowNull()) {
slot.setIsNullable(true);
} else {
slot.setIsNullable(false);
}
}
}
private TMiniLoadEtlTaskRequest createRequest(long taskId) throws LoadException {
ScanNode csvScanNode = new CsvScanNode(new PlanNodeId(0), destTupleDesc, destTable, job);
desc.computeMemLayout();
try {
csvScanNode.finalize(null);
} catch (UserException e) {
LOG.warn("csvScanNode finalize failed[err={}]", e);
throw new LoadException("CSV scan finalize failed.", e);
}
PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), csvScanNode, DataPartition.UNPARTITIONED);
fragment.setSink(tableSink);
try {
fragment.finalize(null, false);
} catch (Exception e) {
LOG.info("fragment finalize failed.e = {}", e);
throw new LoadException("Fragment finalize failed.", e);
}
TMiniLoadEtlTaskRequest request = new TMiniLoadEtlTaskRequest();
request.setProtocolVersion(TAgentServiceVersion.V1);
TExecPlanFragmentParams params = new TExecPlanFragmentParams();
params.setProtocolVersion(PaloInternalServiceVersion.V1);
params.setFragment(fragment.toThrift());
params.setDescTbl(desc.toThrift());
params.setImportLabel(job.getLabel());
params.setDbName(db.getFullName());
params.setLoadJobId(job.getId());
LoadErrorHub.Param param = load.getLoadErrorHubInfo();
if (param != null) {
TLoadErrorHubInfo info = param.toThrift();
if (info != null) {
params.setLoadErrorHubInfo(info);
}
}
TPlanFragmentExecParams execParams = new TPlanFragmentExecParams();
// Only use fragment id
TUniqueId uniqueId = new TUniqueId(job.getId(), taskId);
execParams.setQueryId(new TUniqueId(uniqueId));
execParams.setFragmentInstanceId(uniqueId);
execParams.per_node_scan_ranges = Maps.newHashMap();
execParams.per_exch_num_senders = Maps.newHashMap();
execParams.destinations = Lists.newArrayList();
params.setParams(execParams);
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQueryType(TQueryType.LOAD);
params.setQueryOptions(queryOptions);
request.setParams(params);
return request;
}
}

View File

@ -1,139 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.thrift.TEtlState;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
// Used to process pull load etl task
@Deprecated
public class PullLoadEtlTask extends LoadEtlTask {
private static final Logger LOG = LogManager.getLogger(PullLoadEtlTask.class);
private PullLoadJobMgr mgr;
public PullLoadEtlTask(LoadJob job) {
super(job);
mgr = Catalog.getCurrentCatalog().getPullLoadJobMgr();
}
@Override
protected String getErrorMsg() {
String errMsg = null;
PullLoadJob pullLoadJob = mgr.getJob(job.getId());
if (pullLoadJob != null) {
PullLoadTask failureTask = pullLoadJob.getFailureTask();
if (failureTask != null) {
if (failureTask.getExecuteStatus() != null) {
errMsg = "Broker etl failed: " + failureTask.getExecuteStatus().getErrorMsg();
}
}
}
return errMsg != null ? errMsg : super.getErrorMsg();
}
@Override
protected boolean updateJobEtlStatus() {
PullLoadJob pullLoadJob = mgr.getJob(job.getId());
EtlStatus etlStatus = job.getEtlJobStatus();
if (pullLoadJob == null) {
LOG.warn("pullLoadJob is null. JobId is {}", job.getId());
return false;
}
switch (pullLoadJob.getState()) {
case CANCELED:
case FAILED:
etlStatus.setState(TEtlState.CANCELLED);
break;
case FINISHED:
updateFinishInfo(pullLoadJob);
etlStatus.setState(TEtlState.FINISHED);
break;
case RUNNING:
etlStatus.setState(TEtlState.RUNNING);
break;
default:
etlStatus.setState(TEtlState.UNKNOWN);
break;
}
return true;
}
private void updateFinishInfo(PullLoadJob pullLoadJob) {
Map<String, Long> fileMap = Maps.newHashMap();
long numRowsNormal = 0;
long numRowsAbnormal = 0;
String trackingUrl = null;
for (PullLoadTask task : pullLoadJob.tasks) {
fileMap.putAll(task.getFileMap());
String value = task.getCounters().get(DPP_NORMAL_ALL);
if (value != null) {
numRowsNormal += Long.valueOf(value);
}
value = task.getCounters().get(DPP_ABNORMAL_ALL);
if (value != null) {
numRowsAbnormal += Long.valueOf(value);
}
if (trackingUrl == null && task.getTrackingUrl() != null) {
trackingUrl = task.getTrackingUrl();
}
}
Map<String, String> counters = Maps.newHashMap();
counters.put(DPP_NORMAL_ALL, "" + numRowsNormal);
counters.put(DPP_ABNORMAL_ALL, "" + numRowsAbnormal);
EtlStatus etlJobStatus = job.getEtlJobStatus();
etlJobStatus.setFileMap(fileMap);
etlJobStatus.setCounters(counters);
if (trackingUrl != null) {
etlJobStatus.setTrackingUrl(trackingUrl);
}
}
@Override
protected void processEtlRunning() throws LoadException {
}
@Override
protected Map<String, Pair<String, Long>> getFilePathMap() throws LoadException {
Map<String, Long> fileMap = job.getEtlJobStatus().getFileMap();
if (fileMap == null) {
throw new LoadException("get etl files error");
}
Map<String, Pair<String, Long>> filePathMap = Maps.newHashMap();
for (Map.Entry<String, Long> entry : fileMap.entrySet()) {
String partitionIndexBucket = getPartitionIndexBucketString(entry.getKey());
// http://host:8000/data/dir/file
filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(), entry.getValue()));
}
return filePathMap;
}
}

View File

@ -1,110 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.load.LoadJob;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Set;
// One pull load job
@Deprecated
public class PullLoadJob {
private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
public enum State {
RUNNING,
FINISHED,
CANCELED,
FAILED,
UNKNOWN;
public boolean isRunning() {
return this == RUNNING;
}
}
// Input params
public final LoadJob job;
public final List<PullLoadTask> tasks;
public Set<Integer> finishedTask;
// Used to
public volatile State state;
// Only used when this job has failed.
public PullLoadTask failureTask;
public PullLoadJob(LoadJob job, List<PullLoadTask> tasks) {
this.job = job;
this.tasks = tasks;
finishedTask = Sets.newHashSet();
state = State.RUNNING;
}
public long getId() {
return job.getId();
}
public synchronized State getState() {
return state;
}
public synchronized boolean isRunning() {
return state.isRunning();
}
public synchronized void cancel() {
state = State.CANCELED;
for (PullLoadTask task : tasks) {
task.cancel();
}
}
public PullLoadTask getFailureTask() {
return failureTask;
}
public synchronized void onTaskFinished(PullLoadTask task) {
int taskId = task.taskId;
if (!state.isRunning()) {
LOG.info("Ignore task info after this job has been stable. taskId={}:{}", task.jobId, taskId);
return;
}
if (!finishedTask.add(taskId)) {
LOG.info("Receive duplicate task information. taskId={}:{}", task.jobId, taskId);
}
if (finishedTask.size() == tasks.size()) {
state = State.FINISHED;
}
}
public synchronized void onTaskFailed(PullLoadTask task) {
if (!state.isRunning()) {
LOG.info("Ignore task info after this job has been stable. taskId={}:{}", task.jobId, task.taskId);
return;
}
state = State.FAILED;
failureTask = task;
}
}

View File

@ -1,200 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReentrantLock;
@Deprecated
public class PullLoadJobMgr {
private static final Logger LOG = LogManager.getLogger(PullLoadJobMgr.class);
// Lock protect
private final ReentrantLock lock = new ReentrantLock();
private final Map<Long, PullLoadJob> idToJobs = Maps.newHashMap();
private final BlockingQueue<PullLoadTask> pendingTasks = Queues.newLinkedBlockingQueue();
private ExecutorService executorService;
private int concurrency = 10;
public PullLoadJobMgr(boolean needRegisterMetric) {
executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", needRegisterMetric);
}
/**
* Start Task manager to work.
* First it will ask all backends to collect task status
* After collected, this will start scheduler to work.
*/
public void start() {
for (int i = 0; i < concurrency; ++i) {
executorService.submit(new TaskExecutor());
}
}
/**
* Submit a load job.
* This is called when a job turn from pending to ETL.
* Or this can be called before 'start' called.
*/
public void submit(PullLoadJob job) {
lock.lock();
try {
if (idToJobs.containsKey(job.getId())) {
// Same job id contains
return;
}
idToJobs.put(job.getId(), job);
for (PullLoadTask task : job.tasks) {
pendingTasks.add(task);
}
} finally {
lock.unlock();
}
}
public PullLoadJob.State getJobState(long jobId) {
lock.lock();
try {
PullLoadJob ctx = idToJobs.get(jobId);
if (ctx == null) {
return PullLoadJob.State.UNKNOWN;
}
return ctx.state;
} finally {
lock.unlock();
}
}
// NOTE:
// Must call this after job's state is not running
// Otherwise, the returned object will be process concurrently,
// this could be lead to an invalid situation
public PullLoadJob getJob(long jobId) {
lock.lock();
try {
return idToJobs.get(jobId);
} finally {
lock.unlock();
}
}
// Cancel one job, remove all its tasks
public void cancelJob(long jobId) {
PullLoadJob job = null;
lock.lock();
try {
job = idToJobs.remove(jobId);
} finally {
lock.unlock();
}
// Cancel job out of lock guard
if (job != null) {
job.cancel();
}
}
// Only used when master replay job
public void remove(long jobId) {
lock.lock();
try {
idToJobs.remove(jobId);
} finally {
lock.unlock();
}
}
public boolean isFailureCanRetry(Status status) {
return true;
}
public class TaskExecutor implements Runnable {
private void processOneTask(PullLoadTask task, PullLoadJob job) throws UserException {
int retryTime = 3;
for (int i = 0; i < retryTime; ++i) {
if (!job.isRunning()) {
throw new UserException("Job has been cancelled.");
}
task.executeOnce();
if (task.isFinished()) {
return;
} else {
boolean needRetry = isFailureCanRetry(task.getExecuteStatus());
if (!needRetry) {
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
}
}
}
}
@Override
public void run() {
while (true) {
PullLoadTask task;
PullLoadJob job;
try {
task = pendingTasks.take();
if (task == null) {
continue;
}
job = getJob(task.jobId);
if (job == null || !job.isRunning()) {
LOG.info("Job is not running now. taskId={}:{}", task.jobId, task.taskId);
continue;
}
} catch (InterruptedException e) {
LOG.info("Interrupted when take task.");
continue;
}
try {
processOneTask(task, job);
if (task.isFinished()) {
job.onTaskFinished(task);
} else {
job.onTaskFailed(task);
}
} catch (Throwable e) {
LOG.warn("Process one pull load task exception. job id: {}, task id: {}",
task.jobId, task.taskId, e);
task.onFailed(null, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage()));
job.onTaskFailed(task);
}
}
}
}
}

View File

@ -1,237 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
import java.util.UUID;
// A pull load task is used to process one table of this pull load job.
@Deprecated
public class PullLoadTask {
private static final Logger LOG = LogManager.getLogger(PullLoadTask.class);
// Input parameter
public final long jobId;
public final int taskId;
public final Database db;
public final OlapTable table;
public final BrokerDesc brokerDesc;
public final List<BrokerFileGroup> fileGroups;
public final long jobDeadlineMs;
private PullLoadTaskPlanner planner;
// Useful things after executed
private Map<String, Long> fileMap;
private String trackingUrl;
private Map<String, String> counters;
private final long execMemLimit;
// Runtime variables
private enum State {
RUNNING,
FINISHED,
FAILED,
CANCELLED,
}
private TUniqueId queryId;
private Coordinator curCoordinator;
private State executeState = State.RUNNING;
private Status executeStatus;
private Thread curThread;
public PullLoadTask(
long jobId, int taskId,
Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit) {
this.jobId = jobId;
this.taskId = taskId;
this.db = db;
this.table = table;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
this.jobDeadlineMs = jobDeadlineMs;
this.execMemLimit = execMemLimit;
}
public void init(List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
planner = new PullLoadTaskPlanner(this);
planner.plan(fileStatusList, fileNum);
}
public Map<String, Long> getFileMap() {
return fileMap;
}
public String getTrackingUrl() {
return trackingUrl;
}
public Map<String, String> getCounters() {
return counters;
}
private long getLeftTimeMs() {
if (jobDeadlineMs <= 0) {
return Config.broker_load_default_timeout_second * 1000;
}
return jobDeadlineMs - System.currentTimeMillis();
}
public synchronized void cancel() {
if (curCoordinator != null) {
curCoordinator.cancel();
}
}
public synchronized boolean isFinished() {
return executeState == State.FINISHED;
}
public Status getExecuteStatus() {
return executeStatus;
}
public synchronized void onCancelled(String reason) {
if (executeState == State.RUNNING) {
executeState = State.CANCELLED;
executeStatus = Status.CANCELLED;
LOG.info("cancel one pull load task({}). task id: {}, query id: {}, job id: {}",
reason, taskId, DebugUtil.printId(curCoordinator.getQueryId()), jobId);
}
}
public synchronized void onFinished(Map<String, Long> fileMap,
Map<String, String> counters,
String trackingUrl) {
if (executeState == State.RUNNING) {
executeState = State.FINISHED;
executeStatus = Status.OK;
this.fileMap = fileMap;
this.counters = counters;
this.trackingUrl = trackingUrl;
LOG.info("finished one pull load task. task id: {}, query id: {}, job id: {}",
taskId, DebugUtil.printId(curCoordinator.getQueryId()), jobId);
}
}
public synchronized void onFailed(TUniqueId id, Status failStatus) {
if (executeState == State.RUNNING) {
if (id != null && !queryId.equals(id)) {
return;
}
executeState = State.FAILED;
executeStatus = failStatus;
LOG.info("failed one pull load task({}). task id: {}, query id: {}, job id: {}",
failStatus.getErrorMsg(), taskId, id != null ? DebugUtil.printId(id) : "NaN", jobId);
}
}
private void actualExecute() {
int waitSecond = (int) (getLeftTimeMs() / 1000);
if (waitSecond <= 0) {
onCancelled("waiting timeout");
return;
}
// TODO(zc): to refine coordinator
try {
curCoordinator.exec();
} catch (Exception e) {
LOG.warn("pull load task exec failed", e);
onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, "Coordinator execute failed: " + e.getMessage()));
return;
}
if (curCoordinator.join(waitSecond)) {
Status status = curCoordinator.getExecStatus();
if (status.ok()) {
Map<String, Long> resultFileMap = Maps.newHashMap();
for (String file : curCoordinator.getDeltaUrls()) {
resultFileMap.put(file, -1L);
}
onFinished(resultFileMap, curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl());
} else {
onFailed(queryId, status);
}
} else {
onCancelled("execution timeout");
}
}
public void executeOnce() throws UserException {
synchronized (this) {
if (curThread != null) {
throw new UserException("Task already executing.");
}
curThread = Thread.currentThread();
executeState = State.RUNNING;
executeStatus = Status.OK;
// New one query id,
UUID uuid = UUID.randomUUID();
queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
curCoordinator = new Coordinator(jobId, queryId, planner.getDescTable(),
planner.getFragments(), planner.getScanNodes(), db.getClusterName(), TimeUtils.DEFAULT_TIME_ZONE);
curCoordinator.setQueryType(TQueryType.LOAD);
curCoordinator.setExecMemoryLimit(execMemLimit);
curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000));
}
boolean needUnregister = false;
try {
QeProcessorImpl.INSTANCE.registerQuery(queryId, curCoordinator);
actualExecute();
needUnregister = true;
} catch (UserException e) {
onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage()));
} finally {
if (needUnregister) {
QeProcessorImpl.INSTANCE.unregisterQuery(queryId);
}
synchronized (this) {
curThread = null;
curCoordinator = null;
}
}
}
}

View File

@ -1,160 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.task;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.UserException;
import org.apache.doris.planner.BrokerScanNode;
import org.apache.doris.planner.DataPartition;
import org.apache.doris.planner.DataSplitSink;
import org.apache.doris.planner.ExchangeNode;
import org.apache.doris.planner.OlapRewriteNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanFragmentId;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.thrift.TBrokerFileStatus;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
// Planner used to generate a plan for pull load ETL work
@Deprecated
public class PullLoadTaskPlanner {
private static final Logger LOG = LogManager.getLogger(PullLoadTaskPlanner.class);
// Input param
private final PullLoadTask task;
// Something useful
private final Analyzer analyzer;
private final DescriptorTable descTable;
// Output param
private final List<PlanFragment> fragments;
private final List<ScanNode> scanNodes;
private int nextNodeId = 0;
public PullLoadTaskPlanner(PullLoadTask task) {
this.task = task;
this.analyzer = new Analyzer(Catalog.getCurrentCatalog(), null);
this.descTable = analyzer.getDescTbl();
this.fragments = Lists.newArrayList();
this.scanNodes = Lists.newArrayList();
}
// NOTE: DB lock need hold when call this function.
public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded) throws UserException {
// Tuple descriptor used for all nodes in plan.
OlapTable table = task.table;
// Generate tuple descriptor
List<Expr> slotRefs = Lists.newArrayList();
TupleDescriptor tupleDesc = descTable.createTupleDescriptor();
for (Column col : table.getFullSchema()) {
SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc);
slotDesc.setIsMaterialized(true);
slotDesc.setColumn(col);
if (col.isAllowNull()) {
slotDesc.setIsNullable(true);
} else {
slotDesc.setIsNullable(false);
}
slotRefs.add(new SlotRef(slotDesc));
}
// Generate plan tree
// 1. first Scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(table, task.brokerDesc, task.fileGroups);
scanNode.init(analyzer);
scanNodes.add(scanNode);
// equal node
OlapRewriteNode rewriteNode = new OlapRewriteNode(
new PlanNodeId(nextNodeId++), scanNode, table, tupleDesc, slotRefs);
rewriteNode.init(analyzer);
descTable.computeMemLayout();
rewriteNode.finalize(analyzer);
PlanFragment scanFragment = new PlanFragment(new PlanFragmentId(0), rewriteNode, DataPartition.RANDOM);
scanNode.setFragmentId(scanFragment.getFragmentId());
scanNode.setFragment(scanFragment);
fragments.add(scanFragment);
// exchange node
ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId(nextNodeId++), rewriteNode, false);
exchangeNode.init(analyzer);
// Create data sink
DataSplitSink splitSink = null;
try {
splitSink = new DataSplitSink(table, tupleDesc);
} catch (AnalysisException e) {
LOG.info("New DataSplitSink failed.{}", e);
throw new UserException(e.getMessage());
}
PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(1), exchangeNode, splitSink.getOutputPartition());
scanFragment.setDestination(exchangeNode);
scanFragment.setOutputPartition(splitSink.getOutputPartition());
sinkFragment.setSink(splitSink);
fragments.add(sinkFragment);
// Get partition
for (PlanFragment fragment : fragments) {
try {
fragment.finalize(analyzer, false);
} catch (NotImplementedException e) {
LOG.info("Fragment finalize failed.{}", e);
throw new UserException("Fragment finalize failed.");
}
}
Collections.reverse(fragments);
}
public DescriptorTable getDescTable() {
return descTable;
}
public List<PlanFragment> getFragments() {
return fragments;
}
public List<ScanNode> getScanNodes() {
return scanNodes;
}
}