From 6d8391e8b5240115519c422e0ac2eb3444294b04 Mon Sep 17 00:00:00 2001 From: DongLiang-0 <46414265+DongLiang-0@users.noreply.github.com> Date: Mon, 9 Oct 2023 09:56:26 +0800 Subject: [PATCH] [fix](load)fix use regex split partition may cause backtracking (#24903) --- .../org/apache/doris/qe/MultiLoadMgr.java | 41 ++++++++++--------- .../org/apache/doris/task/StreamLoadTask.java | 10 ++--- 2 files changed, 27 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 27f947aaf5..2d1f512e29 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger; import org.awaitility.Awaitility; import java.io.StringReader; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -61,6 +62,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; // Class used to record state of multi-load operation public class MultiLoadMgr { @@ -104,11 +106,11 @@ public class MultiLoadMgr { // Add one load job private void load(String fullDbName, String label, - String subLabel, String table, - List> files, - TNetworkAddress fileAddr, - Map properties, - long timestamp) throws DdlException { + String subLabel, String table, + List> files, + TNetworkAddress fileAddr, + Map properties, + long timestamp) throws DdlException { LabelName multiLabel = new LabelName(fullDbName, label); lock.writeLock().lock(); try { @@ -250,9 +252,9 @@ public class MultiLoadMgr { } public void addFile(String subLabel, String table, List> files, - TNetworkAddress fileAddr, - Map properties, - long timestamp) throws DdlException { + TNetworkAddress fileAddr, + Map properties, + long timestamp) throws DdlException { if (isSubLabelUsed(subLabel, timestamp)) { // sub label is used and this is a retry request. @@ -334,7 +336,6 @@ public class MultiLoadMgr { return backendId; } - public LoadStmt toLoadStmt() throws DdlException { LabelName commitLabel = multiLabel; @@ -380,8 +381,8 @@ public class MultiLoadMgr { private Set timestamps = Sets.newHashSet(); public TableLoadDesc(String tbl, String label, List> files, - TNetworkAddress address, Map properties, - long timestamp) { + TNetworkAddress address, Map properties, + long timestamp) { this.tbl = tbl; this.filesByLabel = Maps.newLinkedHashMap(); @@ -415,7 +416,6 @@ public class MultiLoadMgr { timestamps.add(timestamp); } - public Long getBackendId() { return backendId; } @@ -463,13 +463,16 @@ public class MultiLoadMgr { } } if (properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) != null) { - String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) - .trim().split("\\s*,\\s*"); - partitionNames = new PartitionNames(false, Lists.newArrayList(partNames)); + String[] splitPartNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS).trim().split(","); + List partNames = Arrays.stream(splitPartNames).map(String::trim) + .collect(Collectors.toList()); + partitionNames = new PartitionNames(false, partNames); } else if (properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) != null) { - String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) - .trim().split("\\s*,\\s*"); - partitionNames = new PartitionNames(true, Lists.newArrayList(partNames)); + String[] splitTempPartNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS).trim() + .split(","); + List tempPartNames = Arrays.stream(splitTempPartNames).map(String::trim) + .collect(Collectors.toList()); + partitionNames = new PartitionNames(true, tempPartNames); } if (properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE) != null) { mergeType = LoadTask.MergeType.valueOf(properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE)); @@ -486,7 +489,7 @@ public class MultiLoadMgr { jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, ""); jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, ""); fuzzyParse = Boolean.valueOf( - properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false")); + properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false")); } } DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator, diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java index c99c720ee0..4bcf28da15 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/StreamLoadTask.java @@ -37,13 +37,13 @@ import org.apache.doris.thrift.TStreamLoadPutRequest; import org.apache.doris.thrift.TUniqueId; import com.google.common.base.Strings; -import com.google.common.collect.Lists; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.io.StringReader; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; public class StreamLoadTask implements LoadTaskInfo { @@ -269,7 +269,6 @@ public class StreamLoadTask implements LoadTaskInfo { return !Strings.isNullOrEmpty(sequenceCol); } - @Override public String getSequenceCol() { return sequenceCol; @@ -358,11 +357,12 @@ public class StreamLoadTask implements LoadTaskInfo { headerType = request.getHeaderType(); } if (request.isSetPartitions()) { - String[] partNames = request.getPartitions().trim().split("\\s*,\\s*"); + String[] splitPartNames = request.getPartitions().trim().split(","); + List partNames = Arrays.stream(splitPartNames).map(String::trim).collect(Collectors.toList()); if (request.isSetIsTempPartition()) { - partitions = new PartitionNames(request.isIsTempPartition(), Lists.newArrayList(partNames)); + partitions = new PartitionNames(request.isIsTempPartition(), partNames); } else { - partitions = new PartitionNames(false, Lists.newArrayList(partNames)); + partitions = new PartitionNames(false, partNames); } } switch (request.getFileType()) {