[Bug] Fix StackOverFlow bug after rewriting the column descs of load stmt (#5656)

1. Fix a Self-referencing bug.
2. Also fix a display bug of SHOW BROKER.
This commit is contained in:
Mingyu Chen
2021-04-23 09:45:39 +08:00
committed by GitHub
parent 4fa25b6eb9
commit b12399657b
9 changed files with 90 additions and 54 deletions

View File

@ -45,6 +45,10 @@ public class ShowBrokerStmt extends ShowStmt {
public ShowResultSetMetaData getMetaData() {
ShowResultSetMetaData.Builder builder = ShowResultSetMetaData.builder();
for (String title : BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES) {
if (title.equals(BrokerMgr.BROKER_PROC_NODE_TITLE_NAMES.get(BrokerMgr.HOSTNAME_INDEX))) {
// SHOW BROKER does not show hostname
continue;
}
builder.addColumn(new Column(title, ScalarType.createVarchar(30)));
}
return builder.build();

View File

@ -63,7 +63,7 @@ public class FsBroker implements Writable, Comparable<FsBroker> {
isAlive = true;
isChanged = true;
lastStartTime = hbResponse.getHbTime();
} else if (lastStartTime == -1) {
} else if (lastStartTime <= 0) {
lastStartTime = hbResponse.getHbTime();
}
lastUpdateTime = hbResponse.getHbTime();

View File

@ -22,7 +22,6 @@ import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.BinaryPredicate;
import org.apache.doris.analysis.CancelLoadStmt;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.DataDescription;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
@ -35,6 +34,7 @@ import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StorageBackend;
@ -88,6 +88,7 @@ import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentClient;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.task.PushTask;
import org.apache.doris.thrift.TBrokerScanRangeParams;
import org.apache.doris.thrift.TEtlState;
@ -96,6 +97,11 @@ import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPriority;
import org.apache.doris.transaction.TransactionNotFoundException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
@ -103,11 +109,6 @@ import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -924,12 +925,12 @@ public class Load {
* This function should be used for broker load v2 and stream load.
* And it must be called in same db lock when planing.
*/
public static void initColumns(Table tbl, List<ImportColumnDesc> columnExprs,
public static void initColumns(Table tbl, LoadTaskInfo.ImportColumnDescs columnDescs,
Map<String, Pair<String, List<String>>> columnToHadoopFunction,
Map<String, Expr> exprsByName, Analyzer analyzer, TupleDescriptor srcTupleDesc,
Map<String, SlotDescriptor> slotDescByName, TBrokerScanRangeParams params) throws UserException {
rewriteColumns(columnExprs);
initColumns(tbl, columnExprs, columnToHadoopFunction, exprsByName, analyzer,
rewriteColumns(columnDescs);
initColumns(tbl, columnDescs.descs, columnToHadoopFunction, exprsByName, analyzer,
srcTupleDesc, slotDescByName, params, true);
}
@ -1120,12 +1121,16 @@ public class Load {
LOG.debug("after init column, exprMap: {}", exprsByName);
}
public static void rewriteColumns(List<ImportColumnDesc> columnExprs) {
public static void rewriteColumns(LoadTaskInfo.ImportColumnDescs columnDescs) {
if (columnDescs.isColumnDescsRewrited) {
return;
}
Map<String, Expr> derivativeColumns = new HashMap<>();
// find and rewrite the derivative columns
// e.g. (v1,v2=v1+1,v3=v2+1) --> (v1, v2=v1+1, v3=v1+1+1)
// 1. find the derivative columns
for (ImportColumnDesc importColumnDesc : columnExprs) {
for (ImportColumnDesc importColumnDesc : columnDescs.descs) {
if (!importColumnDesc.isColumn()) {
if (importColumnDesc.getExpr() instanceof SlotRef) {
String columnName = ((SlotRef) importColumnDesc.getExpr()).getColumnName();
@ -1139,6 +1144,7 @@ public class Load {
}
}
columnDescs.isColumnDescsRewrited = true;
}
private static void recursiveRewrite(Expr expr, Map<String, Expr> derivativeColumns) {

View File

@ -18,13 +18,13 @@
package org.apache.doris.load.routineload;
import org.apache.doris.analysis.AlterRoutineLoadStmt;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.CreateRoutineLoadStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.LoadStmt;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.catalog.Catalog;
@ -90,7 +90,6 @@ import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
/**
* Routine load job is a function which stream load data from streaming medium to doris.
@ -155,7 +154,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
protected long authCode;
// protected RoutineLoadDesc routineLoadDesc; // optional
protected PartitionNames partitions; // optional
protected List<ImportColumnDesc> columnDescs; // optional
protected ImportColumnDescs columnDescs; // optional
protected Expr precedingFilter; // optional
protected Expr whereExpr; // optional
protected Separator columnSeparator; // optional
@ -345,12 +344,12 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {
if (routineLoadDesc != null) {
columnDescs = Lists.newArrayList();
columnDescs = new ImportColumnDescs();
if (routineLoadDesc.getColumnsInfo() != null) {
ImportColumnsStmt columnsStmt = routineLoadDesc.getColumnsInfo();
if (columnsStmt.getColumns() != null || columnsStmt.getColumns().size() != 0) {
for (ImportColumnDesc columnDesc : columnsStmt.getColumns()) {
columnDescs.add(columnDesc);
columnDescs.descs.add(columnDesc);
}
}
}
@ -579,12 +578,11 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
}
@Override
public List<ImportColumnDesc> getColumnExprDescs() {
public ImportColumnDescs getColumnExprDescs() {
if (columnDescs == null) {
return new ArrayList<>();
return new ImportColumnDescs();
}
// use the copy of columnDescs avoid duplicated add delete condition
return columnDescs.stream().collect(Collectors.toList());
return columnDescs;
}
public String getJsonPaths() {
@ -1351,7 +1349,7 @@ public abstract class RoutineLoadJob extends AbstractTxnStateChangeCallback impl
private String jobPropertiesToJsonString() {
Map<String, String> jobProperties = Maps.newHashMap();
jobProperties.put("partitions", partitions == null ? STAR_STRING : Joiner.on(",").join(partitions.getPartitionNames()));
jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs));
jobProperties.put("columnToColumnExpr", columnDescs == null ? STAR_STRING : Joiner.on(",").join(columnDescs.descs));
jobProperties.put("precedingFilter", precedingFilter == null ? STAR_STRING : precedingFilter.toSql());
jobProperties.put("whereExpr", whereExpr == null ? STAR_STRING : whereExpr.toSql());
if (getFormat().equalsIgnoreCase("json")) {

View File

@ -40,6 +40,7 @@ import org.apache.doris.load.BrokerFileGroup;
import org.apache.doris.load.Load;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.system.Backend;
import org.apache.doris.task.LoadTaskInfo;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TBrokerRangeDesc;
import org.apache.doris.thrift.TBrokerScanRange;
@ -230,22 +231,22 @@ public class BrokerScanNode extends LoadScanNode {
// for load job, column exprs is got from file group
// for query, there is no column exprs, they will be got from table's schema in "Load.initColumns"
List<ImportColumnDesc> columnExprs = Lists.newArrayList();
LoadTaskInfo.ImportColumnDescs columnDescs = new LoadTaskInfo.ImportColumnDescs();
if (isLoad()) {
columnExprs = context.fileGroup.getColumnExprList();
columnDescs.descs = context.fileGroup.getColumnExprList();
if (mergeType == LoadTask.MergeType.MERGE) {
columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
} else if (mergeType == LoadTask.MergeType.DELETE) {
columnExprs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
columnDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
// add columnExpr for sequence column
if (context.fileGroup.hasSequenceCol()) {
columnExprs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
columnDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL,
new SlotRef(null, context.fileGroup.getSequenceCol())));
}
}
Load.initColumns(targetTable, columnExprs,
Load.initColumns(targetTable, columnDescs,
context.fileGroup.getColumnToHadoopFunction(), context.exprMap, analyzer,
context.srcTupleDescriptor, context.slotDescByName, context.params);
}

View File

@ -39,12 +39,12 @@ import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
@ -123,14 +123,16 @@ public class StreamLoadScanNode extends LoadScanNode {
srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor("StreamLoadScanNode");
TBrokerScanRangeParams params = new TBrokerScanRangeParams();
List<ImportColumnDesc> columnExprDescs = taskInfo.getColumnExprDescs();
if (mergeType == LoadTask.MergeType.MERGE) {
columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
} else if (mergeType == LoadTask.MergeType.DELETE) {
columnExprDescs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
if (taskInfo.hasSequenceCol()) {
columnExprDescs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, taskInfo.getSequenceCol())));
LoadTaskInfo.ImportColumnDescs columnExprDescs = taskInfo.getColumnExprDescs();
if (!columnExprDescs.isColumnDescsRewrited) {
if (mergeType == LoadTask.MergeType.MERGE) {
columnExprDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(deleteCondition));
} else if (mergeType == LoadTask.MergeType.DELETE) {
columnExprDescs.descs.add(ImportColumnDesc.newDeleteSignImportColumnDesc(new IntLiteral(1)));
}
if (taskInfo.hasSequenceCol()) {
columnExprDescs.descs.add(new ImportColumnDesc(Column.SEQUENCE_COL, new SlotRef(null, taskInfo.getSequenceCol())));
}
}
Load.initColumns(dstTable, columnExprDescs, null /* no hadoop function */,

View File

@ -17,14 +17,16 @@
package org.apache.doris.task;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.Separator;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import com.google.common.collect.Lists;
import java.util.List;
public interface LoadTaskInfo {
@ -46,11 +48,17 @@ public interface LoadTaskInfo {
public boolean isFuzzyParse();
public boolean isNumAsString();
public String getPath();
public List<ImportColumnDesc> getColumnExprDescs();
public ImportColumnDescs getColumnExprDescs();
public boolean isStrictMode();
public Expr getPrecedingFilter();
public Expr getWhereExpr();
public Separator getColumnSeparator();
public Separator getLineDelimiter();
public static class ImportColumnDescs {
public List<ImportColumnDesc> descs = Lists.newArrayList();
public boolean isColumnDescsRewrited = false;
}
}

View File

@ -19,7 +19,6 @@ package org.apache.doris.task;
import org.apache.doris.analysis.Separator;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ImportColumnDesc;
import org.apache.doris.analysis.ImportColumnsStmt;
import org.apache.doris.analysis.ImportWhereStmt;
import org.apache.doris.analysis.PartitionNames;
@ -43,7 +42,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import java.io.StringReader;
import java.util.List;
public class StreamLoadTask implements LoadTaskInfo {
@ -60,7 +58,7 @@ public class StreamLoadTask implements LoadTaskInfo {
private boolean fuzzyParse;
// optional
private List<ImportColumnDesc> columnExprDescs = Lists.newArrayList();
private ImportColumnDescs columnExprDescs = new ImportColumnDescs();
private Expr whereExpr;
private Separator columnSeparator;
private Separator lineDelimiter;
@ -103,7 +101,7 @@ public class StreamLoadTask implements LoadTaskInfo {
return formatType;
}
public List<ImportColumnDesc> getColumnExprDescs() {
public ImportColumnDescs getColumnExprDescs() {
return columnExprDescs;
}
@ -309,7 +307,7 @@ public class StreamLoadTask implements LoadTaskInfo {
}
if (columnsStmt.getColumns() != null && !columnsStmt.getColumns().isEmpty()) {
columnExprDescs = columnsStmt.getColumns();
columnExprDescs.descs = columnsStmt.getColumns();
}
}

View File

@ -30,18 +30,16 @@ import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import com.google.common.collect.Lists;
import org.apache.doris.task.LoadTaskInfo;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import mockit.Expectations;
import mockit.Injectable;
@ -143,21 +141,42 @@ public class LoadStmtTest {
@Test
public void testRewrite() throws Exception{
LoadTaskInfo.ImportColumnDescs columnDescs = new LoadTaskInfo.ImportColumnDescs();
List<ImportColumnDesc> columns1 = getColumns("c1,c2,c3,tmp_c4=c1 + 1, tmp_c5 = tmp_c4+1");
Load.rewriteColumns(columns1);
columnDescs.descs = columns1;
columnDescs.isColumnDescsRewrited = false;
Load.rewriteColumns(columnDescs);
String orig = "`c1` + 1 + 1";
Assert.assertEquals(orig, columns1.get(4).getExpr().toString());
List<ImportColumnDesc> columns2 = getColumns("c1,c2,c3,tmp_c5 = tmp_c4+1, tmp_c4=c1 + 1");
columnDescs.descs = columns2;
columnDescs.isColumnDescsRewrited = false;
String orig2 = "`tmp_c4` + 1";
Load.rewriteColumns(columns2);
Load.rewriteColumns(columnDescs);
Assert.assertEquals(orig2, columns2.get(3).getExpr().toString());
List<ImportColumnDesc> columns3 = getColumns("c1,c2,c3");
columnDescs.descs = columns3;
columnDescs.isColumnDescsRewrited = false;
String orig3 = "c3";
Load.rewriteColumns(columns3);
Load.rewriteColumns(columnDescs);
Assert.assertEquals(orig3, columns3.get(2).toString());
List<ImportColumnDesc> columns4 = getColumns("c1, c1=ifnull(c1, 0), c2=ifnull(c1, 0)");
columnDescs.descs = columns4;
columnDescs.isColumnDescsRewrited = false;
Load.rewriteColumns(columnDescs);
Assert.assertEquals("c1", columns4.get(0).toString());
Assert.assertEquals("c1=ifnull(`c1`, 0)", columns4.get(1).toString());
Assert.assertEquals("c2=ifnull(ifnull(`c1`, 0), 0)", columns4.get(2).toString());
// will not rewrite again
Assert.assertTrue(columnDescs.isColumnDescsRewrited);
Load.rewriteColumns(columnDescs);
Assert.assertEquals("c1", columns4.get(0).toString());
Assert.assertEquals("c1=ifnull(`c1`, 0)", columns4.get(1).toString());
Assert.assertEquals("c2=ifnull(ifnull(`c1`, 0), 0)", columns4.get(2).toString());
}
private List<ImportColumnDesc> getColumns(String columns) throws Exception {