diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java index 065a360a57..9fe144bee4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ShowBrokerStmt.java @@ -45,6 +45,10 @@ public class ShowBrokerStmt extends ShowStmt { public ShowResultSetMetaData getMetaData() { ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder(); for (String title : BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES) { + if (title.equals(BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES.get(BrokerMgr.HOSTNAME_INDEX))) { + // SHOW BROKER does not show hostname + continue; + } builder.addColumn(new Column(title, ScalarType.createVarchar(30))); } return builder.build(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java index a7fffe2ae4..90b0cc69cf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/FsBroker.java @@ -63,7 +63,7 @@ public class FsBroker implements Writable, Comparable { isAlive = true; isChanged = true; lastStartTime = hbResponse.getHbTime(); - } else if (lastStartTime == -1) { + } else if (lastStartTime <= 0) { lastStartTime = hbResponse.getHbTime(); } lastUpdateTime = hbResponse.getHbTime(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java index db7b958b49..126fae51ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/Load.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/Load.java @@ -22,7 +22,6 @@ import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.CancelLoadStmt; import org.apache.doris.analysis.CastExpr; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; @@ -35,6 +34,7 @@ import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.StorageBackend; @@ -88,6 +88,7 @@ import org.apache.doris.qe.ConnectContext; import org.apache.doris.system.Backend; import org.apache.doris.task.AgentClient; import org.apache.doris.task.AgentTaskQueue; +import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.task.PushTask; import org.apache.doris.thrift.TBrokerScanRangeParams; import org.apache.doris.thrift.TEtlState; @@ -96,6 +97,11 @@ import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPriority; import org.apache.doris.transaction.TransactionNotFoundException; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.Lists; @@ -103,11 +109,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.google.gson.Gson; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang.StringUtils; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; - import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -924,12 +925,12 @@ public class Load { * This function should be used for broker load v2 and stream load. * And it must be called in same db lock when planing. */ - public static void initColumns(Table tbl, List columnExprs, + public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs, Map>> columnToHadoopFunction, Map exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc, Map slotDescByName, TBrokerScanRangeParams params) throws UserException { - rewriteColumns(columnExprs); - initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer, + rewriteColumns(columnDescs); + initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer, srcTupleDesc, slotDescByName, params, true); } @@ -1120,12 +1121,16 @@ public class Load { LOG.debug("after init column, exprMap: {}", exprsByName); } - public static void rewriteColumns(List columnExprs) { + public static void rewriteColumns(LoadTaskInfo.ImportColumnDescs columnDescs) { + if (columnDescs.isColumnDescsRewrited) { + return; + } + Map derivativeColumns = new HashMap<>(); // find and rewrite the derivative columns // e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1) // 1. find the derivative columns - for (ImportColumnDesc importColumnDesc : columnExprs) { + for (ImportColumnDesc importColumnDesc : columnDescs.descs) { if (!importColumnDesc.isColumn()) { if (importColumnDesc.getExpr() instanceof SlotRef) { String columnName = ((SlotRef) importColumnDesc.getExpr()).getColumnName(); @@ -1139,6 +1144,7 @@ public class Load { } } + columnDescs.isColumnDescsRewrited = true; } private static void recursiveRewrite(Expr expr, Map derivativeColumns) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 812c053773..e946883d8b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -18,13 +18,13 @@ package org.apache.doris.load.routineload; import org.apache.doris.analysis.AlterRoutineLoadStmt; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.CreateRoutineLoadStmt; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.Catalog; @@ -90,7 +90,6 @@ import java.util.Optional; import java.util.Queue; import java.util.UUID; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; /** * Routine load job is a function which stream load data from streaming medium to doris. @@ -155,7 +154,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl protected long authCode; // protected RoutineLoadDesc routineLoadDesc; // optional protected PartitionNames partitions; // optional - protected List columnDescs; // optional + protected ImportColumnDescs columnDescs; // optional protected Expr precedingFilter; // optional protected Expr whereExpr; // optional protected Separator columnSeparator; // optional @@ -345,12 +344,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) { if (routineLoadDesc != null) { - columnDescs = Lists.newArrayList(); + columnDescs = new ImportColumnDescs(); if (routineLoadDesc.getColumnsInfo() != null) { ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo(); if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) { for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) { - columnDescs.add(columnDesc); + columnDescs.descs.add(columnDesc); } } } @@ -579,12 +578,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl } @Override - public List getColumnExprDescs() { + public ImportColumnDescs getColumnExprDescs() { if (columnDescs == null) { - return new ArrayList<>(); + return new ImportColumnDescs(); } - // use the copy of columnDescs avoid duplicated add delete condition - return columnDescs.stream().collect(Collectors.toList()); + return columnDescs; } public String getJsonPaths() { @@ -1351,7 +1349,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl private String jobPropertiesToJsonString() { Map jobProperties = Maps.newHashMap(); jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions.getPartitionNames())); - jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs)); + jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs.descs)); jobProperties.put("precedingFilter", precedingFilter == null ? STAR_STRING : precedingFilter.toSql()); jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toSql()); if (getFormat().equalsIgnoreCase("json")) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 7c5c65133e..8b1a6c31a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -40,6 +40,7 @@ import org.apache.doris.load.BrokerFileGroup; import org.apache.doris.load.Load; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.system.Backend; +import org.apache.doris.task.LoadTaskInfo; import org.apache.doris.thrift.TBrokerFileStatus; import org.apache.doris.thrift.TBrokerRangeDesc; import org.apache.doris.thrift.TBrokerScanRange; @@ -230,22 +231,22 @@ public class BrokerScanNode extends LoadScanNode { // for load job, column exprs is got from file group // for query, there is no column exprs, they will be got from table's schema in "Load.initColumns" - List columnExprs = Lists.newArrayList(); + LoadTaskInfo.ImportColumnDescs columnDescs = new LoadTaskInfo.ImportColumnDescs(); if (isLoad()) { - columnExprs = context.fileGroup.getColumnExprList(); + columnDescs.descs = context.fileGroup.getColumnExprList(); if (mergeType == LoadTask.MergeType.MERGE) { - columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition)); + columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition)); } else if (mergeType == LoadTask.MergeType.DELETE) { - columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); + columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); } // add columnExpr for sequence column if (context.fileGroup.hasSequenceCol()) { - columnExprs.add(new ImportColumnDesc(Column.SEQUENCE_COL, + columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, context.fileGroup.getSequenceCol()))); } } - Load.initColumns(targetTable, columnExprs, + Load.initColumns(targetTable, columnDescs, context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer, context.srcTupleDescriptor, context.slotDescByName, context.params); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java index edb935dc7f..b5e2e0dd60 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadScanNode.java @@ -39,12 +39,12 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocations; import org.apache.doris.thrift.TUniqueId; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import java.nio.charset.Charset; import java.util.List; import java.util.Map; @@ -123,14 +123,16 @@ public class StreamLoadScanNode extends LoadScanNode { srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode"); TBrokerScanRangeParams params = new TBrokerScanRangeParams(); - List columnExprDescs = taskInfo.getColumnExprDescs(); - if (mergeType == LoadTask.MergeType.MERGE) { - columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition)); - } else if (mergeType == LoadTask.MergeType.DELETE) { - columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); - } - if (taskInfo.hasSequenceCol()) { - columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, taskInfo.getSequenceCol()))); + LoadTaskInfo.ImportColumnDescs columnExprDescs = taskInfo.getColumnExprDescs(); + if (!columnExprDescs.isColumnDescsRewrited) { + if (mergeType == LoadTask.MergeType.MERGE) { + columnExprDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition)); + } else if (mergeType == LoadTask.MergeType.DELETE) { + columnExprDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1))); + } + if (taskInfo.hasSequenceCol()) { + columnExprDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, taskInfo.getSequenceCol()))); + } } Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */, diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java index c6dd020a39..a36b767f10 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/LoadTaskInfo.java @@ -17,14 +17,16 @@ package org.apache.doris.task; -import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.PartitionNames; +import org.apache.doris.analysis.Separator; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.thrift.TFileFormatType; import org.apache.doris.thrift.TFileType; +import com.google.common.collect.Lists; + import java.util.List; public interface LoadTaskInfo { @@ -46,11 +48,17 @@ public interface LoadTaskInfo { public boolean isFuzzyParse(); public boolean isNumAsString(); public String getPath(); - public List getColumnExprDescs(); + + public ImportColumnDescs getColumnExprDescs(); public boolean isStrictMode(); public Expr getPrecedingFilter(); public Expr getWhereExpr(); public Separator getColumnSeparator(); public Separator getLineDelimiter(); + + public static class ImportColumnDescs { + public List descs = Lists.newArrayList(); + public boolean isColumnDescsRewrited = false; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index 57d07f4d9b..01d852fe6a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -19,7 +19,6 @@ package org.apache.doris.task; import org.apache.doris.analysis.Separator; import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.ImportColumnsStmt; import org.apache.doris.analysis.ImportWhereStmt; import org.apache.doris.analysis.PartitionNames; @@ -43,7 +42,6 @@ import com.google.common.base.Strings; import com.google.common.collect.Lists; import java.io.StringReader; -import java.util.List; public class StreamLoadTask implements LoadTaskInfo { @@ -60,7 +58,7 @@ public class StreamLoadTask implements LoadTaskInfo { private boolean fuzzyParse; // optional - private List columnExprDescs = Lists.newArrayList(); + private ImportColumnDescs columnExprDescs = new ImportColumnDescs(); private Expr whereExpr; private Separator columnSeparator; private Separator lineDelimiter; @@ -103,7 +101,7 @@ public class StreamLoadTask implements LoadTaskInfo { return formatType; } - public List getColumnExprDescs() { + public ImportColumnDescs getColumnExprDescs() { return columnExprDescs; } @@ -309,7 +307,7 @@ public class StreamLoadTask implements LoadTaskInfo { } if (columnsStmt.getColumns() != null && !columnsStmt.getColumns().isEmpty()) { - columnExprDescs = columnsStmt.getColumns(); + columnExprDescs.descs = columnsStmt.getColumns(); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index ba83147329..aee6eed42e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -30,18 +30,16 @@ import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.mysql.privilege.PaloAuth; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; - -import com.google.common.collect.Lists; +import org.apache.doris.task.LoadTaskInfo; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; + import java.io.StringReader; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import mockit.Expectations; import mockit.Injectable; @@ -143,21 +141,42 @@ public class LoadStmtTest { @Test public void testRewrite() throws Exception{ + LoadTaskInfo.ImportColumnDescs columnDescs = new LoadTaskInfo.ImportColumnDescs(); List columns1 = getColumns("c1,c2,c3,tmp_c4=c1 + 1, tmp_c5 = tmp_c4+1"); - Load.rewriteColumns(columns1); + columnDescs.descs = columns1; + columnDescs.isColumnDescsRewrited = false; + Load.rewriteColumns(columnDescs); String orig = "`c1` + 1 + 1"; Assert.assertEquals(orig, columns1.get(4).getExpr().toString()); List columns2 = getColumns("c1,c2,c3,tmp_c5 = tmp_c4+1, tmp_c4=c1 + 1"); + columnDescs.descs = columns2; + columnDescs.isColumnDescsRewrited = false; String orig2 = "`tmp_c4` + 1"; - Load.rewriteColumns(columns2); + Load.rewriteColumns(columnDescs); Assert.assertEquals(orig2, columns2.get(3).getExpr().toString()); List columns3 = getColumns("c1,c2,c3"); + columnDescs.descs = columns3; + columnDescs.isColumnDescsRewrited = false; String orig3 = "c3"; - Load.rewriteColumns(columns3); + Load.rewriteColumns(columnDescs); Assert.assertEquals(orig3, columns3.get(2).toString()); + List columns4 = getColumns("c1, c1=ifnull(c1, 0), c2=ifnull(c1, 0)"); + columnDescs.descs = columns4; + columnDescs.isColumnDescsRewrited = false; + Load.rewriteColumns(columnDescs); + Assert.assertEquals("c1", columns4.get(0).toString()); + Assert.assertEquals("c1=ifnull(`c1`, 0)", columns4.get(1).toString()); + Assert.assertEquals("c2=ifnull(ifnull(`c1`, 0), 0)", columns4.get(2).toString()); + // will not rewrite again + Assert.assertTrue(columnDescs.isColumnDescsRewrited); + Load.rewriteColumns(columnDescs); + Assert.assertEquals("c1", columns4.get(0).toString()); + Assert.assertEquals("c1=ifnull(`c1`, 0)", columns4.get(1).toString()); + Assert.assertEquals("c2=ifnull(ifnull(`c1`, 0), 0)", columns4.get(2).toString()); + } private List getColumns(String columns) throws Exception {