diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 8715704374..e9b643ffc7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -206,7 +206,6 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.CreateReplicaTask; import org.apache.doris.task.MasterTaskExecutor; -import org.apache.doris.task.PullLoadJobMgr; import org.apache.doris.thrift.TStorageFormat; import org.apache.doris.thrift.TStorageMedium; import org.apache.doris.thrift.TStorageType; @@ -367,7 +366,6 @@ public class Catalog { private MetaReplayState metaReplayState; - private PullLoadJobMgr pullLoadJobMgr; private BrokerMgr brokerMgr; private ResourceMgr resourceMgr; @@ -520,7 +518,6 @@ public class Catalog { this.isDefaultClusterCreated = false; - this.pullLoadJobMgr = new PullLoadJobMgr(!isCheckpointCatalog); this.brokerMgr = new BrokerMgr(); this.resourceMgr = new ResourceMgr(); @@ -589,10 +586,6 @@ public class Catalog { return SingletonHolder.INSTANCE; } - public PullLoadJobMgr getPullLoadJobMgr() { - return pullLoadJobMgr; - } - public BrokerMgr getBrokerMgr() { return brokerMgr; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index 9c4104feaf..315ba25c8e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -595,7 +595,7 @@ public class Load { * This is only used for hadoop load */ public static void checkAndCreateSource(Database db, DataDescription dataDescription, - Map>> tableToPartitionSources, EtlJobType jobType) throws DdlException { + Map>> tableToPartitionSources, EtlJobType jobType) throws DdlException { Source source = new Source(dataDescription.getFilePaths()); long tableId = -1; Set sourcePartitionIds = Sets.newHashSet(); @@ -779,7 +779,7 @@ public class Load { Pair> function = entry.getValue(); try { DataDescription.validateMappingFunction(function.first, function.second, columnNameMap, - mappingColumn, dataDescription.isHadoopLoad()); + mappingColumn, dataDescription.isHadoopLoad()); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } @@ -883,7 +883,7 @@ public class Load { * (A, B, C) SET (__doris_shadow_B = B) */ ImportColumnDesc importColumnDesc = new ImportColumnDesc(column.getName(), - new SlotRef(null, originCol)); + new SlotRef(null, originCol)); shadowColumnDescs.add(importColumnDesc); } } else { @@ -918,7 +918,7 @@ public class Load { Map slotDescByName, TBrokerScanRangeParams params) throws UserException { rewriteColumns(columnExprs); initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer, - srcTupleDesc, slotDescByName, params, true); + srcTupleDesc, slotDescByName, params, true); } /* @@ -946,7 +946,7 @@ public class Load { } // check whether the OlapTable has sequenceCol boolean hasSequenceCol = false; - if (tbl instanceof OlapTable && ((OlapTable)tbl).hasSequenceCol()) { + if (tbl instanceof OlapTable && ((OlapTable) tbl).hasSequenceCol()) { hasSequenceCol = true; } @@ -1771,12 +1771,7 @@ public class Load { clearJob(job, srcState); } - if (job.getBrokerDesc() != null) { - if (srcState == JobState.ETL) { - // Cancel job id - Catalog.getCurrentCatalog().getPullLoadJobMgr().cancelJob(job.getId()); - } - } + Preconditions.checkState(job.getBrokerDesc() == null); LOG.info("cancel load job success. job: {}", job); return true; } @@ -1925,14 +1920,14 @@ public class Load { if (tableNames.isEmpty()) { // forward compatibility if (!Catalog.getCurrentCatalog().getAuth().checkDbPriv(ConnectContext.get(), dbName, - PrivPredicate.LOAD)) { + PrivPredicate.LOAD)) { continue; } } else { boolean auth = true; for (String tblName : tableNames) { if (!Catalog.getCurrentCatalog().getAuth().checkTblPriv(ConnectContext.get(), dbName, - tblName, PrivPredicate.LOAD)) { + tblName, PrivPredicate.LOAD)) { auth = false; break; } @@ -2004,8 +1999,8 @@ public class Load { // task info jobInfo.add("cluster:" + loadJob.getHadoopCluster() - + "; timeout(s):" + loadJob.getTimeoutSecond() - + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); + + "; timeout(s):" + loadJob.getTimeoutSecond() + + "; max_filter_ratio:" + loadJob.getMaxFilterRatio()); // error msg if (loadJob.getState() == JobState.CANCELLED) { @@ -2330,7 +2325,7 @@ public class Load { continue; } replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), - info.getDataSize(), info.getRowCount()); + info.getDataSize(), info.getRowCount()); } } @@ -2349,7 +2344,7 @@ public class Load { continue; } updatePartitionVersion(partition, partitionLoadInfo.getVersion(), - partitionLoadInfo.getVersionHash(), jobId); + partitionLoadInfo.getVersionHash(), jobId); // update table row count for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { @@ -2446,7 +2441,7 @@ public class Load { continue; } replica.updateVersionInfo(info.getVersion(), info.getVersionHash(), - info.getDataSize(), info.getRowCount()); + info.getDataSize(), info.getRowCount()); } } } else { @@ -2652,7 +2647,7 @@ public class Load { // hdfs://host:port/outputPath/dbId/loadLabel/ DppConfig dppConfig = job.getHadoopDppConfig(); String outputPath = DppScheduler.getEtlOutputPath(dppConfig.getFsDefaultName(), - dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), ""); + dppConfig.getOutputPath(), job.getDbId(), job.getLabel(), ""); try { dppScheduler.deleteEtlOutputPath(outputPath); } catch (Exception e) { @@ -2696,7 +2691,7 @@ public class Load { } public boolean updateLoadJobState(LoadJob job, JobState destState, CancelType cancelType, String msg, - List failedMsg) { + List failedMsg) { boolean result = true; JobState srcState = null; @@ -2855,7 +2850,7 @@ public class Load { } updatePartitionVersion(partition, partitionLoadInfo.getVersion(), - partitionLoadInfo.getVersionHash(), jobId); + partitionLoadInfo.getVersionHash(), jobId); for (MaterializedIndex materializedIndex : partition.getMaterializedIndices(IndexExtState.ALL)) { long tableRowCount = 0L; @@ -2889,7 +2884,7 @@ public class Load { long partitionId = partition.getId(); partition.updateVisibleVersionAndVersionHash(version, versionHash); LOG.info("update partition version success. version: {}, version hash: {}, job id: {}, partition id: {}", - version, versionHash, jobId, partitionId); + version, versionHash, jobId, partitionId); } private boolean processCancelled(LoadJob job, CancelType cancelType, String msg, List failedMsg) { @@ -2961,8 +2956,8 @@ public class Load { if (srcState == JobState.LOADING || srcState == JobState.QUORUM_FINISHED) { for (PushTask pushTask : job.getPushTasks()) { AgentTaskQueue.removePushTask(pushTask.getBackendId(), pushTask.getSignature(), - pushTask.getVersion(), pushTask.getVersionHash(), - pushTask.getPushType(), pushTask.getTaskType()); + pushTask.getVersion(), pushTask.getVersionHash(), + pushTask.getPushType(), pushTask.getTaskType()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java index 2cbcd1091e..0b8d847b97 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/LoadChecker.java @@ -39,11 +39,8 @@ import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.AgentTaskQueue; import org.apache.doris.task.HadoopLoadEtlTask; import org.apache.doris.task.HadoopLoadPendingTask; -import org.apache.doris.task.InsertLoadEtlTask; import org.apache.doris.task.MasterTask; import org.apache.doris.task.MasterTaskExecutor; -import org.apache.doris.task.MiniLoadEtlTask; -import org.apache.doris.task.MiniLoadPendingTask; import org.apache.doris.task.PushTask; import org.apache.doris.thrift.TPriority; import org.apache.doris.thrift.TPushType; @@ -172,9 +169,6 @@ public class LoadChecker extends MasterDaemon { case HADOOP: task = new HadoopLoadPendingTask(job); break; - case MINI: - task = new MiniLoadPendingTask(job); - break; default: LOG.warn("unknown etl job type. type: {}", etlJobType.name()); break; @@ -200,12 +194,6 @@ public class LoadChecker extends MasterDaemon { case HADOOP: task = new HadoopLoadEtlTask(job); break; - case MINI: - task = new MiniLoadEtlTask(job); - break; - case INSERT: - task = new InsertLoadEtlTask(job); - break; default: LOG.warn("unknown etl job type. type: {}", etlJobType.name()); break; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java deleted file mode 100644 index 04aab412cd..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/CsvScanNode.java +++ /dev/null @@ -1,208 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; -import org.apache.doris.load.LoadJob; -import org.apache.doris.load.PartitionLoadInfo; -import org.apache.doris.load.Source; -import org.apache.doris.load.TableLoadInfo; -import org.apache.doris.thrift.TColumnType; -import org.apache.doris.thrift.TCsvScanNode; -import org.apache.doris.thrift.TMiniLoadEtlFunction; -import org.apache.doris.thrift.TPlanNode; -import org.apache.doris.thrift.TPlanNodeType; -import org.apache.doris.thrift.TScanRangeLocations; - -import com.google.common.base.MoreObjects; -import com.google.common.base.MoreObjects.ToStringHelper; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collection; -import java.util.List; -import java.util.Map; - -public class CsvScanNode extends ScanNode { - private static final Logger LOG = LogManager.getLogger(CsvScanNode.class); - - private final OlapTable table; - private final LoadJob job; - - private List filePaths = Lists.newArrayList(); - - private String columnSeparator; - private String lineDelimiter; - - private List columns = Lists.newArrayList(); - private List unspecifiedColumns = Lists.newArrayList(); - private List defaultValues = Lists.newArrayList(); - - private Map columnTypeMapping = Maps.newHashMap(); - private Map columnToFunction = Maps.newHashMap(); - - private double maxFilterRatio = 0.0; - - public CsvScanNode(PlanNodeId id, TupleDescriptor desc, OlapTable table, LoadJob job) { - super(id, desc, "Scan CSV"); - this.table = table; - this.job = job; - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.CSV_SCAN_NODE; - msg.csv_scan_node = new TCsvScanNode(desc.getId().asInt(), filePaths); - - if (!Strings.isNullOrEmpty(columnSeparator)) { - msg.csv_scan_node.setColumnSeparator(columnSeparator); - } - if (!Strings.isNullOrEmpty(lineDelimiter)) { - msg.csv_scan_node.setLineDelimiter(lineDelimiter); - } - - if (!columns.isEmpty()) { - msg.csv_scan_node.setColumns(columns); - } - if (!unspecifiedColumns.isEmpty()) { - msg.csv_scan_node.setUnspecifiedColumns(unspecifiedColumns); - } - if (!defaultValues.isEmpty()) { - msg.csv_scan_node.setDefaultValues(defaultValues); - } - - if (!columnToFunction.isEmpty()) { - msg.csv_scan_node.setColumnFunctionMapping(columnToFunction); - } - msg.csv_scan_node.setColumnTypeMapping(columnTypeMapping); - msg.csv_scan_node.setMaxFilterRatio(maxFilterRatio); - msg.csv_scan_node.setColumnSeparator(columnSeparator); - } - - @Override - public void finalize(Analyzer analyzer) throws UserException { - // get file paths - // file paths in different partitions are same in mini load - TableLoadInfo tableLoadInfo = job.getTableLoadInfo(table.getId()); - Collection partitionLoadInfos = tableLoadInfo.getIdToPartitionLoadInfo().values(); - Preconditions.checkState(!partitionLoadInfos.isEmpty()); - PartitionLoadInfo partitionLoadInfo = (PartitionLoadInfo) partitionLoadInfos.toArray()[0]; - List sources = partitionLoadInfo.getSources(); - Preconditions.checkState(sources.size() == 1); - Source source = sources.get(0); - filePaths = source.getFileUrls(); - - // column separator - columnSeparator = source.getColumnSeparator(); - - // line delimiter - lineDelimiter = source.getLineDelimiter(); - - // construct columns (specified & unspecified) and default-values - List columnNames = Lists.newArrayList(); - for (Column column : table.getBaseSchema()) { - columnNames.add(column.getName()); - } - columns = source.getColumnNames(); - if (columns.isEmpty()) { - columns = columnNames; - } - for (String columnName : columns) { - if (!columnNames.contains(columnName)) { - LOG.info("Column [{}] is not exist in table schema, will be ignored.", columnName); - } - } - for (String columnName : columnNames) { - Column column = table.getColumn(columnName); - columnTypeMapping.put(columnName, column.getOriginType().toColumnTypeThrift()); - - if (columns.contains(columnName)) { - continue; - } - unspecifiedColumns.add(columnName); - String defaultValue = column.getDefaultValue(); - if (defaultValue == null && false == column.isAllowNull()) { - throw new UserException( - "Column [" + columnName + "] should be specified. " - + "only columns have default values can be omitted"); - } - if (true == column.isAllowNull() && null == defaultValue) { - defaultValues.add("\\N"); - } else { - defaultValues.add(defaultValue); - } - } - - Map>> functions = source.getColumnToFunction(); - for (String key : functions.keySet()) { - final Pair> pair = functions.get(key); - TMiniLoadEtlFunction function = new TMiniLoadEtlFunction(); - int paramColumnIndex = -1; - for (String str : pair.second) { - boolean find = false; - for (int i = 0; i < columns.size(); i++) { - if (str.equals(columns.get(i))) { - paramColumnIndex = i; - find = true; - break; - } - } - if (find) { - function.setFunctionName(pair.first); - function.setParamColumnIndex(paramColumnIndex); - columnToFunction.put(key, function); - break; - } - } - } - // max filter ratio - // TODO: remove!! - maxFilterRatio = job.getMaxFilterRatio(); - } - - @Override - protected String debugString() { - ToStringHelper helper = MoreObjects.toStringHelper(this); - return helper.addValue(super.debugString()).toString(); - } - - /** - * like Mysql, We query Meta to get request's data location - * extra result info will pass to backend ScanNode - */ - @Override - public List getScanRangeLocations(long maxScanRangeLength) { - return null; - } - - @Override - public int getNumInstances() { - return 1; - } -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java index 16caf11d2d..213cce21c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java @@ -205,9 +205,7 @@ public class DistributedPlanner { childFragments.get(0)); } else if (root instanceof SelectNode) { result = createSelectNodeFragment((SelectNode) root, childFragments); - } else if (root instanceof OlapRewriteNode) { - result = createOlapRewriteNodeFragment((OlapRewriteNode) root, childFragments); - } else if (root instanceof SetOperationNode) { + } else if (root instanceof SetOperationNode) { result = createSetOperationNodeFragment((SetOperationNode) root, childFragments, fragments); } else if (root instanceof MergeNode) { result = createMergeNodeFragment((MergeNode) root, childFragments, fragments); @@ -838,15 +836,6 @@ public class DistributedPlanner { return childFragment; } - private PlanFragment createOlapRewriteNodeFragment( - OlapRewriteNode olapRewriteNode, ArrayList childFragments) { - Preconditions.checkState(olapRewriteNode.getChildren().size() == childFragments.size()); - PlanFragment childFragment = childFragments.get(0); - olapRewriteNode.setChild(0, childFragment.getPlanRoot()); - childFragment.setPlanRoot(olapRewriteNode); - return childFragment; - } - /** * Replace node's child at index childIdx with an ExchangeNode that receives its input from childFragment. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java deleted file mode 100644 index 95dc1b20ef..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapRewriteNode.java +++ /dev/null @@ -1,126 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.planner; - -import com.google.common.collect.Lists; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.ExprSubstitutionMap; -import org.apache.doris.analysis.InsertStmt; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.Table; -import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TOlapRewriteNode; -import org.apache.doris.thrift.TPlanNode; -import org.apache.doris.thrift.TPlanNodeType; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; - -// Used to convert column to valid OLAP table -public class OlapRewriteNode extends PlanNode { - private static final Logger LOG = LogManager.getLogger(OlapRewriteNode.class); - - private InsertStmt insertStmt; - - private Table table; - private TupleDescriptor tupleDescriptor; - private List newResultExprs; - - public OlapRewriteNode(PlanNodeId id, PlanNode child, InsertStmt insertStmt) { - super(id, insertStmt.getOlapTuple().getId().asList(), "OLAP REWRITE NODE"); - addChild(child); - - this.table = insertStmt.getTargetTable(); - this.tupleDescriptor = insertStmt.getOlapTuple(); - this.insertStmt = insertStmt; - } - - public OlapRewriteNode(PlanNodeId id, PlanNode child, - Table table, - TupleDescriptor tupleDescriptor, - List slotRefs) { - super(id, child.getTupleIds(), "OLAP REWRITE NODE"); - addChild(child); - this.table = table; - this.tupleDescriptor = tupleDescriptor; - this.newResultExprs = slotRefs; - } - - @Override - public void init(Analyzer analyzer) throws UserException { - assignConjuncts(analyzer); - - // Set smap to the combined children's smaps and apply that to all conjuncts_. - createDefaultSmap(analyzer); - - computeStats(analyzer); - // assignedConjuncts = analyzr.getAssignedConjuncts(); - - if (insertStmt != null) { - ExprSubstitutionMap combinedChildSmap = getCombinedChildSmap(); - newResultExprs = Lists.newArrayList(); - for (Expr expr : insertStmt.getResultExprs()) { - newResultExprs.add(expr.clone(combinedChildSmap)); - } - } - } - - @Override - protected void toThrift(TPlanNode msg) { - msg.node_type = TPlanNodeType.OLAP_REWRITE_NODE; - TOlapRewriteNode tnode = new TOlapRewriteNode(); - for (Column column : table.getBaseSchema()) { - tnode.addToColumnTypes(column.getOriginType().toColumnTypeThrift()); - } - for (Expr expr : newResultExprs) { - tnode.addToColumns(expr.treeToThrift()); - } - tnode.setOutputTupleId(tupleDescriptor.getId().asInt()); - msg.setOlapRewriteNode(tnode); - } - - @Override - public void computeStats(Analyzer analyzer) { - super.computeStats(analyzer); - long cardinality = getChild(0).cardinality; - double selectivity = computeSelectivity(); - if (cardinality < 0 || selectivity < 0) { - this.cardinality = -1; - } else { - this.cardinality = Math.round(cardinality * selectivity); - } - } - - @Override - protected String getNodeExplainString(String prefix, TExplainLevel detailLevel) { - StringBuilder output = new StringBuilder(); - if (!conjuncts.isEmpty()) { - output.append(prefix + "predicates: " + getExplainString(conjuncts) + "\n"); - } - return output.toString(); - } - - @Override - public int getNumInstances() { - return children.get(0).getNumInstances(); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java deleted file mode 100644 index 8a85a9c4b1..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/InsertLoadEtlTask.java +++ /dev/null @@ -1,38 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.common.LoadException; -import org.apache.doris.load.LoadJob; - -public class InsertLoadEtlTask extends MiniLoadEtlTask { - - public InsertLoadEtlTask(LoadJob job) { - super(job); - } - - @Override - protected boolean updateJobEtlStatus() { - return true; - } - - @Override - protected void processEtlRunning() throws LoadException { - throw new LoadException("not implement"); - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java deleted file mode 100644 index 9d1743260a..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadEtlTask.java +++ /dev/null @@ -1,198 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.catalog.Catalog; -import org.apache.doris.common.LoadException; -import org.apache.doris.common.Pair; -import org.apache.doris.load.MiniEtlTaskInfo; -import org.apache.doris.load.EtlStatus; -import org.apache.doris.load.LoadJob; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TMiniLoadEtlStatusResult; -import org.apache.doris.thrift.TEtlState; -import org.apache.doris.thrift.TStatus; -import org.apache.doris.thrift.TStatusCode; - -import com.google.common.collect.Maps; - -import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.LogManager; - -import java.util.Map; - -public class MiniLoadEtlTask extends LoadEtlTask { - private static final Logger LOG = LogManager.getLogger(MiniLoadEtlTask.class); - - public MiniLoadEtlTask(LoadJob job) { - super(job); - } - - @Override - protected boolean updateJobEtlStatus() { - // update etl tasks status - if (job.miniNeedGetTaskStatus()) { - LOG.debug("get mini etl task status actively. job: {}", job); - for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) { - TEtlState etlState = taskInfo.getTaskStatus().getState(); - if (etlState == TEtlState.RUNNING) { - updateEtlTaskStatus(taskInfo); - } - } - } - - // update etl job status - updateEtlJobStatus(); - return true; - } - - private boolean updateEtlTaskStatus(MiniEtlTaskInfo taskInfo) { - // get etl status - TMiniLoadEtlStatusResult result = getMiniLoadEtlStatus(taskInfo.getBackendId(), taskInfo.getId()); - LOG.info("mini load etl status: {}, job: {}", result, job); - if (result == null) { - return false; - } - TStatus tStatus = result.getStatus(); - if (tStatus.getStatusCode() != TStatusCode.OK) { - LOG.warn("get buck load etl status fail. msg: {}, job: {}", tStatus.getErrorMsgs(), job); - return false; - } - - // update etl task status - EtlStatus taskStatus = taskInfo.getTaskStatus(); - if (taskStatus.setState(result.getEtlState())) { - if (result.isSetCounters()) { - taskStatus.setCounters(result.getCounters()); - } - if (result.isSetTrackingUrl()) { - taskStatus.setTrackingUrl(result.getTrackingUrl()); - } - if (result.isSetFileMap()) { - taskStatus.setFileMap(result.getFileMap()); - } - } - - return true; - } - - private TMiniLoadEtlStatusResult getMiniLoadEtlStatus(long backendId, long taskId) { - TMiniLoadEtlStatusResult result = null; - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId); - if (backend == null || !backend.isAlive()) { - String failMsg = "backend is null or is not alive"; - LOG.error(failMsg); - return result; - } - - AgentClient client = new AgentClient(backend.getHost(), backend.getBePort()); - return client.getEtlStatus(job.getId(), taskId); - } - - private boolean updateEtlJobStatus() { - boolean hasCancelledTask = false; - boolean hasRunningTask = false; - long normalNum = 0; - long abnormalNum = 0; - Map fileMap = Maps.newHashMap(); - String trackingUrl = EtlStatus.DEFAULT_TRACKING_URL; - - EtlStatus etlJobStatus = job.getEtlJobStatus(); - for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) { - EtlStatus taskStatus = taskInfo.getTaskStatus(); - switch (taskStatus.getState()) { - case RUNNING: - hasRunningTask = true; - break; - case CANCELLED: - hasCancelledTask = true; - break; - case FINISHED: - // counters and file list - Map counters = taskStatus.getCounters(); - if (counters.containsKey(DPP_NORMAL_ALL)) { - normalNum += Long.parseLong(counters.get(DPP_NORMAL_ALL)); - } - if (counters.containsKey(DPP_ABNORMAL_ALL)) { - abnormalNum += Long.parseLong(counters.get(DPP_ABNORMAL_ALL)); - } - fileMap.putAll(taskStatus.getFileMap()); - if (!taskStatus.getTrackingUrl().equals(EtlStatus.DEFAULT_TRACKING_URL)) { - trackingUrl = taskStatus.getTrackingUrl(); - } - break; - default: - break; - } - } - - if (hasCancelledTask) { - etlJobStatus.setState(TEtlState.CANCELLED); - } else if (hasRunningTask) { - etlJobStatus.setState(TEtlState.RUNNING); - } else { - etlJobStatus.setState(TEtlState.FINISHED); - Map counters = Maps.newHashMap(); - counters.put(DPP_NORMAL_ALL, String.valueOf(normalNum)); - counters.put(DPP_ABNORMAL_ALL, String.valueOf(abnormalNum)); - etlJobStatus.setCounters(counters); - etlJobStatus.setFileMap(fileMap); - etlJobStatus.setTrackingUrl(trackingUrl); - } - - return true; - } - - @Override - protected void processEtlRunning() throws LoadException { - // update mini etl job progress - int finishedTaskNum = 0; - Map idToEtlTask = job.getMiniEtlTasks(); - for (MiniEtlTaskInfo taskInfo : idToEtlTask.values()) { - EtlStatus taskStatus = taskInfo.getTaskStatus(); - if (taskStatus.getState() == TEtlState.FINISHED) { - ++finishedTaskNum; - } - } - - int progress = (int) (finishedTaskNum * 100 / idToEtlTask.size()); - if (progress >= 100) { - // set progress to 100 when status is FINISHED - progress = 99; - } - - job.setProgress(progress); - } - - @Override - protected Map> getFilePathMap() throws LoadException { - Map fileMap = job.getEtlJobStatus().getFileMap(); - if (fileMap == null) { - throw new LoadException("get etl files error"); - } - - Map> filePathMap = Maps.newHashMap(); - for (Map.Entry entry : fileMap.entrySet()) { - String partitionIndexBucket = getPartitionIndexBucketString(entry.getKey()); - // http://host:8000/data/dir/file - filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(), entry.getValue())); - } - - return filePathMap; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java deleted file mode 100644 index f35226b731..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/MiniLoadPendingTask.java +++ /dev/null @@ -1,216 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.analysis.DescriptorTable; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.LoadException; -import org.apache.doris.common.Pair; -import org.apache.doris.common.UserException; -import org.apache.doris.load.EtlSubmitResult; -import org.apache.doris.load.LoadErrorHub; -import org.apache.doris.load.LoadJob; -import org.apache.doris.load.MiniEtlTaskInfo; -import org.apache.doris.load.TableLoadInfo; -import org.apache.doris.planner.CsvScanNode; -import org.apache.doris.planner.DataPartition; -import org.apache.doris.planner.DataSplitSink; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.ScanNode; -import org.apache.doris.system.Backend; -import org.apache.doris.thrift.PaloInternalServiceVersion; -import org.apache.doris.thrift.TAgentResult; -import org.apache.doris.thrift.TAgentServiceVersion; -import org.apache.doris.thrift.TExecPlanFragmentParams; -import org.apache.doris.thrift.TLoadErrorHubInfo; -import org.apache.doris.thrift.TMiniLoadEtlTaskRequest; -import org.apache.doris.thrift.TPlanFragmentExecParams; -import org.apache.doris.thrift.TQueryOptions; -import org.apache.doris.thrift.TQueryType; -import org.apache.doris.thrift.TStatus; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TUniqueId; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; - -public class MiniLoadPendingTask extends LoadPendingTask { - private static final Logger LOG = LogManager.getLogger(MiniLoadPendingTask.class); - - // descriptor used to register all column and table need - private final DescriptorTable desc; - - // destination Db and table get from request - // Data will load to this table - private OlapTable destTable; - - // dest desc - private TupleDescriptor destTupleDesc; - - private DataSplitSink tableSink; - - private List> requests; - - public MiniLoadPendingTask(LoadJob job) { - super(job); - this.desc = new DescriptorTable(); - } - - @Override - protected void createEtlRequest() throws Exception { - requests = Lists.newArrayList(); - for (MiniEtlTaskInfo taskInfo : job.getMiniEtlTasks().values()) { - long taskId = taskInfo.getId(); - long backendId = taskInfo.getBackendId(); - long tableId = taskInfo.getTableId(); - - // All the following operation will process when destTable's read lock are hold. - destTable = (OlapTable) db.getTable(tableId); - if (destTable == null) { - throw new LoadException("table does not exist. id: " + tableId); - } - - destTable.readLock(); - try { - registerToDesc(); - tableSink = new DataSplitSink(destTable, destTupleDesc); - - // add schema hash to table load info - TableLoadInfo tableLoadInfo = job.getTableLoadInfo(destTable.getId()); - for (Map.Entry entry : destTable.getIndexIdToSchemaHash().entrySet()) { - tableLoadInfo.addIndexSchemaHash(entry.getKey(), entry.getValue()); - } - requests.add(new Pair(backendId, createRequest(taskId))); - } finally { - destTable.readUnlock(); - } - } - } - - @Override - protected EtlSubmitResult submitEtlJob(int retry) { - LOG.info("begin submit mini load etl job: {}", job); - - for (Pair pair : requests) { - long backendId = pair.first; - TMiniLoadEtlTaskRequest request = pair.second; - - Backend backend = Catalog.getCurrentSystemInfo().getBackend(backendId); - if (!Catalog.getCurrentSystemInfo().checkBackendAvailable(backendId)) { - String failMsg = "backend is null or is not alive"; - LOG.error(failMsg); - TStatus tStatus = new TStatus(TStatusCode.CANCELLED); - tStatus.setErrorMsgs(Lists.newArrayList(failMsg)); - return new EtlSubmitResult(tStatus, null); - } - - AgentClient client = new AgentClient(backend.getHost(), backend.getBePort()); - TAgentResult submitResult = client.submitEtlTask(request); - if (submitResult.getStatus().getStatusCode() != TStatusCode.OK) { - return new EtlSubmitResult(submitResult.getStatus(), null); - } - } - - return new EtlSubmitResult(new TStatus(TStatusCode.OK), null); - } - - private void registerToDesc() { - destTupleDesc = desc.createTupleDescriptor(); - destTupleDesc.setTable(destTable); - // Lock database and get its schema hash?? - // Make sure that import job has its corresponding schema - for (Column col : destTable.getBaseSchema()) { - SlotDescriptor slot = desc.addSlotDescriptor(destTupleDesc); - // All this slot is needed - slot.setIsMaterialized(true); - slot.setColumn(col); - if (true == col.isAllowNull()) { - slot.setIsNullable(true); - } else { - slot.setIsNullable(false); - } - } - } - - private TMiniLoadEtlTaskRequest createRequest(long taskId) throws LoadException { - ScanNode csvScanNode = new CsvScanNode(new PlanNodeId(0), destTupleDesc, destTable, job); - desc.computeMemLayout(); - try { - csvScanNode.finalize(null); - } catch (UserException e) { - LOG.warn("csvScanNode finalize failed[err={}]", e); - throw new LoadException("CSV scan finalize failed.", e); - } - PlanFragment fragment = new PlanFragment(new PlanFragmentId(0), csvScanNode, DataPartition.UNPARTITIONED); - fragment.setSink(tableSink); - - try { - fragment.finalize(null, false); - } catch (Exception e) { - LOG.info("fragment finalize failed.e = {}", e); - throw new LoadException("Fragment finalize failed.", e); - } - - TMiniLoadEtlTaskRequest request = new TMiniLoadEtlTaskRequest(); - request.setProtocolVersion(TAgentServiceVersion.V1); - TExecPlanFragmentParams params = new TExecPlanFragmentParams(); - params.setProtocolVersion(PaloInternalServiceVersion.V1); - params.setFragment(fragment.toThrift()); - params.setDescTbl(desc.toThrift()); - params.setImportLabel(job.getLabel()); - params.setDbName(db.getFullName()); - params.setLoadJobId(job.getId()); - - LoadErrorHub.Param param = load.getLoadErrorHubInfo(); - if (param != null) { - TLoadErrorHubInfo info = param.toThrift(); - if (info != null) { - params.setLoadErrorHubInfo(info); - } - } - - TPlanFragmentExecParams execParams = new TPlanFragmentExecParams(); - // Only use fragment id - TUniqueId uniqueId = new TUniqueId(job.getId(), taskId); - execParams.setQueryId(new TUniqueId(uniqueId)); - execParams.setFragmentInstanceId(uniqueId); - execParams.per_node_scan_ranges = Maps.newHashMap(); - execParams.per_exch_num_senders = Maps.newHashMap(); - execParams.destinations = Lists.newArrayList(); - params.setParams(execParams); - TQueryOptions queryOptions = new TQueryOptions(); - queryOptions.setQueryType(TQueryType.LOAD); - params.setQueryOptions(queryOptions); - request.setParams(params); - return request; - } - -} - diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java deleted file mode 100644 index 154134cbd2..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadEtlTask.java +++ /dev/null @@ -1,139 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.catalog.Catalog; -import org.apache.doris.common.LoadException; -import org.apache.doris.common.Pair; -import org.apache.doris.load.EtlStatus; -import org.apache.doris.load.LoadJob; -import org.apache.doris.thrift.TEtlState; - -import com.google.common.collect.Maps; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Map; - -// Used to process pull load etl task -@Deprecated -public class PullLoadEtlTask extends LoadEtlTask { - private static final Logger LOG = LogManager.getLogger(PullLoadEtlTask.class); - private PullLoadJobMgr mgr; - - public PullLoadEtlTask(LoadJob job) { - super(job); - mgr = Catalog.getCurrentCatalog().getPullLoadJobMgr(); - } - - @Override - protected String getErrorMsg() { - String errMsg = null; - PullLoadJob pullLoadJob = mgr.getJob(job.getId()); - if (pullLoadJob != null) { - PullLoadTask failureTask = pullLoadJob.getFailureTask(); - if (failureTask != null) { - if (failureTask.getExecuteStatus() != null) { - errMsg = "Broker etl failed: " + failureTask.getExecuteStatus().getErrorMsg(); - } - } - } - return errMsg != null ? errMsg : super.getErrorMsg(); - } - - @Override - protected boolean updateJobEtlStatus() { - PullLoadJob pullLoadJob = mgr.getJob(job.getId()); - EtlStatus etlStatus = job.getEtlJobStatus(); - if (pullLoadJob == null) { - LOG.warn("pullLoadJob is null. JobId is {}", job.getId()); - return false; - } - switch (pullLoadJob.getState()) { - case CANCELED: - case FAILED: - etlStatus.setState(TEtlState.CANCELLED); - break; - case FINISHED: - updateFinishInfo(pullLoadJob); - etlStatus.setState(TEtlState.FINISHED); - break; - case RUNNING: - etlStatus.setState(TEtlState.RUNNING); - break; - default: - etlStatus.setState(TEtlState.UNKNOWN); - break; - } - return true; - } - - private void updateFinishInfo(PullLoadJob pullLoadJob) { - Map fileMap = Maps.newHashMap(); - long numRowsNormal = 0; - long numRowsAbnormal = 0; - String trackingUrl = null; - for (PullLoadTask task : pullLoadJob.tasks) { - fileMap.putAll(task.getFileMap()); - - String value = task.getCounters().get(DPP_NORMAL_ALL); - if (value != null) { - numRowsNormal += Long.valueOf(value); - } - value = task.getCounters().get(DPP_ABNORMAL_ALL); - if (value != null) { - numRowsAbnormal += Long.valueOf(value); - } - if (trackingUrl == null && task.getTrackingUrl() != null) { - trackingUrl = task.getTrackingUrl(); - } - } - Map counters = Maps.newHashMap(); - counters.put(DPP_NORMAL_ALL, "" + numRowsNormal); - counters.put(DPP_ABNORMAL_ALL, "" + numRowsAbnormal); - - EtlStatus etlJobStatus = job.getEtlJobStatus(); - etlJobStatus.setFileMap(fileMap); - etlJobStatus.setCounters(counters); - if (trackingUrl != null) { - etlJobStatus.setTrackingUrl(trackingUrl); - } - } - - @Override - protected void processEtlRunning() throws LoadException { - } - - @Override - protected Map> getFilePathMap() throws LoadException { - Map fileMap = job.getEtlJobStatus().getFileMap(); - if (fileMap == null) { - throw new LoadException("get etl files error"); - } - - Map> filePathMap = Maps.newHashMap(); - for (Map.Entry entry : fileMap.entrySet()) { - String partitionIndexBucket = getPartitionIndexBucketString(entry.getKey()); - // http://host:8000/data/dir/file - filePathMap.put(partitionIndexBucket, Pair.create(entry.getKey(), entry.getValue())); - } - - return filePathMap; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java deleted file mode 100644 index fa2e520a9e..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJob.java +++ /dev/null @@ -1,110 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.load.LoadJob; - -import com.google.common.collect.Sets; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Set; - -// One pull load job -@Deprecated -public class PullLoadJob { - private static final Logger LOG = LogManager.getLogger(PullLoadTask.class); - - public enum State { - RUNNING, - FINISHED, - CANCELED, - FAILED, - UNKNOWN; - - public boolean isRunning() { - return this == RUNNING; - } - } - - // Input params - public final LoadJob job; - public final List tasks; - - public Set finishedTask; - - // Used to - public volatile State state; - // Only used when this job has failed. - public PullLoadTask failureTask; - - public PullLoadJob(LoadJob job, List tasks) { - this.job = job; - this.tasks = tasks; - finishedTask = Sets.newHashSet(); - state = State.RUNNING; - } - - public long getId() { - return job.getId(); - } - - public synchronized State getState() { - return state; - } - - public synchronized boolean isRunning() { - return state.isRunning(); - } - - public synchronized void cancel() { - state = State.CANCELED; - for (PullLoadTask task : tasks) { - task.cancel(); - } - } - - public PullLoadTask getFailureTask() { - return failureTask; - } - - public synchronized void onTaskFinished(PullLoadTask task) { - int taskId = task.taskId; - if (!state.isRunning()) { - LOG.info("Ignore task info after this job has been stable. taskId={}:{}", task.jobId, taskId); - return; - } - if (!finishedTask.add(taskId)) { - LOG.info("Receive duplicate task information. taskId={}:{}", task.jobId, taskId); - } - if (finishedTask.size() == tasks.size()) { - state = State.FINISHED; - } - } - - public synchronized void onTaskFailed(PullLoadTask task) { - if (!state.isRunning()) { - LOG.info("Ignore task info after this job has been stable. taskId={}:{}", task.jobId, task.taskId); - return; - } - state = State.FAILED; - failureTask = task; - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java deleted file mode 100644 index 50a80681ce..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadJobMgr.java +++ /dev/null @@ -1,200 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.common.Status; -import org.apache.doris.common.ThreadPoolManager; -import org.apache.doris.common.UserException; -import org.apache.doris.thrift.TStatusCode; - -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.ReentrantLock; - -@Deprecated -public class PullLoadJobMgr { - private static final Logger LOG = LogManager.getLogger(PullLoadJobMgr.class); - - // Lock protect - private final ReentrantLock lock = new ReentrantLock(); - private final Map idToJobs = Maps.newHashMap(); - - private final BlockingQueue pendingTasks = Queues.newLinkedBlockingQueue(); - private ExecutorService executorService; - - private int concurrency = 10; - - public PullLoadJobMgr(boolean needRegisterMetric) { - executorService = ThreadPoolManager.newDaemonCacheThreadPool(concurrency, "pull-load-job-mgr", needRegisterMetric); - } - - /** - * Start Task manager to work. - * First it will ask all backends to collect task status - * After collected, this will start scheduler to work. - */ - public void start() { - for (int i = 0; i < concurrency; ++i) { - executorService.submit(new TaskExecutor()); - } - } - - /** - * Submit a load job. - * This is called when a job turn from pending to ETL. - * Or this can be called before 'start' called. - */ - public void submit(PullLoadJob job) { - lock.lock(); - try { - if (idToJobs.containsKey(job.getId())) { - // Same job id contains - return; - } - idToJobs.put(job.getId(), job); - for (PullLoadTask task : job.tasks) { - pendingTasks.add(task); - } - } finally { - lock.unlock(); - } - } - - public PullLoadJob.State getJobState(long jobId) { - lock.lock(); - try { - PullLoadJob ctx = idToJobs.get(jobId); - if (ctx == null) { - return PullLoadJob.State.UNKNOWN; - } - return ctx.state; - } finally { - lock.unlock(); - } - } - - // NOTE: - // Must call this after job's state is not running - // Otherwise, the returned object will be process concurrently, - // this could be lead to an invalid situation - public PullLoadJob getJob(long jobId) { - lock.lock(); - try { - return idToJobs.get(jobId); - } finally { - lock.unlock(); - } - } - - // Cancel one job, remove all its tasks - public void cancelJob(long jobId) { - PullLoadJob job = null; - lock.lock(); - try { - job = idToJobs.remove(jobId); - } finally { - lock.unlock(); - } - // Cancel job out of lock guard - if (job != null) { - job.cancel(); - } - } - - // Only used when master replay job - public void remove(long jobId) { - lock.lock(); - try { - idToJobs.remove(jobId); - } finally { - lock.unlock(); - } - } - - public boolean isFailureCanRetry(Status status) { - return true; - } - - public class TaskExecutor implements Runnable { - - private void processOneTask(PullLoadTask task, PullLoadJob job) throws UserException { - int retryTime = 3; - for (int i = 0; i < retryTime; ++i) { - if (!job.isRunning()) { - throw new UserException("Job has been cancelled."); - } - task.executeOnce(); - if (task.isFinished()) { - return; - } else { - boolean needRetry = isFailureCanRetry(task.getExecuteStatus()); - if (!needRetry) { - break; - } - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - - } - } - } - } - - @Override - public void run() { - while (true) { - PullLoadTask task; - PullLoadJob job; - try { - task = pendingTasks.take(); - if (task == null) { - continue; - } - job = getJob(task.jobId); - if (job == null || !job.isRunning()) { - LOG.info("Job is not running now. taskId={}:{}", task.jobId, task.taskId); - continue; - } - } catch (InterruptedException e) { - LOG.info("Interrupted when take task."); - continue; - } - try { - processOneTask(task, job); - if (task.isFinished()) { - job.onTaskFinished(task); - } else { - job.onTaskFailed(task); - } - } catch (Throwable e) { - LOG.warn("Process one pull load task exception. job id: {}, task id: {}", - task.jobId, task.taskId, e); - task.onFailed(null, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); - job.onTaskFailed(task); - } - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java deleted file mode 100644 index 5a44f223cf..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTask.java +++ /dev/null @@ -1,237 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.catalog.Database; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.Config; -import org.apache.doris.common.Status; -import org.apache.doris.common.UserException; -import org.apache.doris.common.util.DebugUtil; -import org.apache.doris.common.util.TimeUtils; -import org.apache.doris.load.BrokerFileGroup; -import org.apache.doris.qe.Coordinator; -import org.apache.doris.qe.QeProcessorImpl; -import org.apache.doris.thrift.TBrokerFileStatus; -import org.apache.doris.thrift.TQueryType; -import org.apache.doris.thrift.TStatusCode; -import org.apache.doris.thrift.TUniqueId; - -import com.google.common.collect.Maps; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.List; -import java.util.Map; -import java.util.UUID; - -// A pull load task is used to process one table of this pull load job. -@Deprecated -public class PullLoadTask { - private static final Logger LOG = LogManager.getLogger(PullLoadTask.class); - // Input parameter - public final long jobId; - public final int taskId; - public final Database db; - public final OlapTable table; - public final BrokerDesc brokerDesc; - public final List fileGroups; - public final long jobDeadlineMs; - - private PullLoadTaskPlanner planner; - - // Useful things after executed - private Map fileMap; - private String trackingUrl; - private Map counters; - private final long execMemLimit; - - // Runtime variables - private enum State { - RUNNING, - FINISHED, - FAILED, - CANCELLED, - } - - private TUniqueId queryId; - private Coordinator curCoordinator; - private State executeState = State.RUNNING; - private Status executeStatus; - private Thread curThread; - - public PullLoadTask( - long jobId, int taskId, - Database db, OlapTable table, - BrokerDesc brokerDesc, List fileGroups, - long jobDeadlineMs, long execMemLimit) { - this.jobId = jobId; - this.taskId = taskId; - this.db = db; - this.table = table; - this.brokerDesc = brokerDesc; - this.fileGroups = fileGroups; - this.jobDeadlineMs = jobDeadlineMs; - this.execMemLimit = execMemLimit; - } - - public void init(List> fileStatusList, int fileNum) throws UserException { - planner = new PullLoadTaskPlanner(this); - planner.plan(fileStatusList, fileNum); - } - - public Map getFileMap() { - return fileMap; - } - - public String getTrackingUrl() { - return trackingUrl; - } - - public Map getCounters() { - return counters; - } - - private long getLeftTimeMs() { - if (jobDeadlineMs <= 0) { - return Config.broker_load_default_timeout_second * 1000; - } - return jobDeadlineMs - System.currentTimeMillis(); - } - - public synchronized void cancel() { - if (curCoordinator != null) { - curCoordinator.cancel(); - } - } - - public synchronized boolean isFinished() { - return executeState == State.FINISHED; - } - - public Status getExecuteStatus() { - return executeStatus; - } - - public synchronized void onCancelled(String reason) { - if (executeState == State.RUNNING) { - executeState = State.CANCELLED; - executeStatus = Status.CANCELLED; - LOG.info("cancel one pull load task({}). task id: {}, query id: {}, job id: {}", - reason, taskId, DebugUtil.printId(curCoordinator.getQueryId()), jobId); - } - } - - public synchronized void onFinished(Map fileMap, - Map counters, - String trackingUrl) { - if (executeState == State.RUNNING) { - executeState = State.FINISHED; - - executeStatus = Status.OK; - this.fileMap = fileMap; - this.counters = counters; - this.trackingUrl = trackingUrl; - LOG.info("finished one pull load task. task id: {}, query id: {}, job id: {}", - taskId, DebugUtil.printId(curCoordinator.getQueryId()), jobId); - } - } - - public synchronized void onFailed(TUniqueId id, Status failStatus) { - if (executeState == State.RUNNING) { - if (id != null && !queryId.equals(id)) { - return; - } - executeState = State.FAILED; - executeStatus = failStatus; - LOG.info("failed one pull load task({}). task id: {}, query id: {}, job id: {}", - failStatus.getErrorMsg(), taskId, id != null ? DebugUtil.printId(id) : "NaN", jobId); - } - } - - private void actualExecute() { - int waitSecond = (int) (getLeftTimeMs() / 1000); - if (waitSecond <= 0) { - onCancelled("waiting timeout"); - return; - } - - // TODO(zc): to refine coordinator - try { - curCoordinator.exec(); - } catch (Exception e) { - LOG.warn("pull load task exec failed", e); - onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, "Coordinator execute failed: " + e.getMessage())); - return; - } - - if (curCoordinator.join(waitSecond)) { - Status status = curCoordinator.getExecStatus(); - if (status.ok()) { - Map resultFileMap = Maps.newHashMap(); - for (String file : curCoordinator.getDeltaUrls()) { - resultFileMap.put(file, -1L); - } - onFinished(resultFileMap, curCoordinator.getLoadCounters(), curCoordinator.getTrackingUrl()); - } else { - onFailed(queryId, status); - } - } else { - onCancelled("execution timeout"); - } - } - - public void executeOnce() throws UserException { - synchronized (this) { - if (curThread != null) { - throw new UserException("Task already executing."); - } - curThread = Thread.currentThread(); - executeState = State.RUNNING; - executeStatus = Status.OK; - - // New one query id, - UUID uuid = UUID.randomUUID(); - queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - curCoordinator = new Coordinator(jobId, queryId, planner.getDescTable(), - planner.getFragments(), planner.getScanNodes(), db.getClusterName(), TimeUtils.DEFAULT_TIME_ZONE); - curCoordinator.setQueryType(TQueryType.LOAD); - curCoordinator.setExecMemoryLimit(execMemLimit); - curCoordinator.setTimeout((int) (getLeftTimeMs() / 1000)); - } - - boolean needUnregister = false; - try { - QeProcessorImpl.INSTANCE.registerQuery(queryId, curCoordinator); - actualExecute(); - needUnregister = true; - } catch (UserException e) { - onFailed(queryId, new Status(TStatusCode.INTERNAL_ERROR, e.getMessage())); - } finally { - if (needUnregister) { - QeProcessorImpl.INSTANCE.unregisterQuery(queryId); - } - synchronized (this) { - curThread = null; - curCoordinator = null; - } - } - } -} diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java deleted file mode 100644 index 251ee2de84..0000000000 --- a/fe/fe-core/src/main/java/org/apache/doris/task/PullLoadTaskPlanner.java +++ /dev/null @@ -1,160 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package org.apache.doris.task; - -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.DescriptorTable; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.SlotDescriptor; -import org.apache.doris.analysis.SlotRef; -import org.apache.doris.analysis.TupleDescriptor; -import org.apache.doris.catalog.Catalog; -import org.apache.doris.catalog.Column; -import org.apache.doris.catalog.OlapTable; -import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.NotImplementedException; -import org.apache.doris.common.UserException; -import org.apache.doris.planner.BrokerScanNode; -import org.apache.doris.planner.DataPartition; -import org.apache.doris.planner.DataSplitSink; -import org.apache.doris.planner.ExchangeNode; -import org.apache.doris.planner.OlapRewriteNode; -import org.apache.doris.planner.PlanFragment; -import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.planner.PlanNodeId; -import org.apache.doris.planner.ScanNode; -import org.apache.doris.thrift.TBrokerFileStatus; - -import com.google.common.collect.Lists; - -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - -import java.util.Collections; -import java.util.List; - -// Planner used to generate a plan for pull load ETL work -@Deprecated -public class PullLoadTaskPlanner { - private static final Logger LOG = LogManager.getLogger(PullLoadTaskPlanner.class); - - // Input param - private final PullLoadTask task; - - // Something useful - private final Analyzer analyzer; - private final DescriptorTable descTable; - - // Output param - private final List fragments; - private final List scanNodes; - - private int nextNodeId = 0; - - public PullLoadTaskPlanner(PullLoadTask task) { - this.task = task; - this.analyzer = new Analyzer(Catalog.getCurrentCatalog(), null); - this.descTable = analyzer.getDescTbl(); - this.fragments = Lists.newArrayList(); - this.scanNodes = Lists.newArrayList(); - } - - // NOTE: DB lock need hold when call this function. - public void plan(List> fileStatusesList, int filesAdded) throws UserException { - // Tuple descriptor used for all nodes in plan. - OlapTable table = task.table; - - // Generate tuple descriptor - List slotRefs = Lists.newArrayList(); - TupleDescriptor tupleDesc = descTable.createTupleDescriptor(); - for (Column col : table.getFullSchema()) { - SlotDescriptor slotDesc = descTable.addSlotDescriptor(tupleDesc); - slotDesc.setIsMaterialized(true); - slotDesc.setColumn(col); - if (col.isAllowNull()) { - slotDesc.setIsNullable(true); - } else { - slotDesc.setIsNullable(false); - } - slotRefs.add(new SlotRef(slotDesc)); - } - - // Generate plan tree - // 1. first Scan node - BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode", - fileStatusesList, filesAdded); - scanNode.setLoadInfo(table, task.brokerDesc, task.fileGroups); - scanNode.init(analyzer); - scanNodes.add(scanNode); - - // equal node - OlapRewriteNode rewriteNode = new OlapRewriteNode( - new PlanNodeId(nextNodeId++), scanNode, table, tupleDesc, slotRefs); - rewriteNode.init(analyzer); - - descTable.computeMemLayout(); - rewriteNode.finalize(analyzer); - - PlanFragment scanFragment = new PlanFragment(new PlanFragmentId(0), rewriteNode, DataPartition.RANDOM); - scanNode.setFragmentId(scanFragment.getFragmentId()); - scanNode.setFragment(scanFragment); - fragments.add(scanFragment); - - // exchange node - ExchangeNode exchangeNode = new ExchangeNode(new PlanNodeId(nextNodeId++), rewriteNode, false); - exchangeNode.init(analyzer); - - // Create data sink - DataSplitSink splitSink = null; - try { - splitSink = new DataSplitSink(table, tupleDesc); - } catch (AnalysisException e) { - LOG.info("New DataSplitSink failed.{}", e); - throw new UserException(e.getMessage()); - } - PlanFragment sinkFragment = new PlanFragment(new PlanFragmentId(1), exchangeNode, splitSink.getOutputPartition()); - scanFragment.setDestination(exchangeNode); - scanFragment.setOutputPartition(splitSink.getOutputPartition()); - sinkFragment.setSink(splitSink); - - fragments.add(sinkFragment); - - // Get partition - for (PlanFragment fragment : fragments) { - try { - fragment.finalize(analyzer, false); - } catch (NotImplementedException e) { - LOG.info("Fragment finalize failed.{}", e); - throw new UserException("Fragment finalize failed."); - } - } - Collections.reverse(fragments); - } - - public DescriptorTable getDescTable() { - return descTable; - } - - public List getFragments() { - return fragments; - } - - public List getScanNodes() { - return scanNodes; - } -}