[fix](load)fix use regex split partition may cause backtracking (#24903)

This commit is contained in:
DongLiang-0
2023-10-09 09:56:26 +08:00
committed by GitHub
parent f41b6a5fc3
commit 6d8391e8b5
2 changed files with 27 additions and 24 deletions

View File

@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger;
import org.awaitility.Awaitility;
import java.io.StringReader;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -61,6 +62,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
// Class used to record state of multi-load operation
public class MultiLoadMgr {
@ -104,11 +106,11 @@ public class MultiLoadMgr {
// Add one load job
private void load(String fullDbName, String label,
String subLabel, String table,
List<Pair<String, Long>> files,
TNetworkAddress fileAddr,
Map<String, String> properties,
long timestamp) throws DdlException {
String subLabel, String table,
List<Pair<String, Long>> files,
TNetworkAddress fileAddr,
Map<String, String> properties,
long timestamp) throws DdlException {
LabelName multiLabel = new LabelName(fullDbName, label);
lock.writeLock().lock();
try {
@ -250,9 +252,9 @@ public class MultiLoadMgr {
}
public void addFile(String subLabel, String table, List<Pair<String, Long>> files,
TNetworkAddress fileAddr,
Map<String, String> properties,
long timestamp) throws DdlException {
TNetworkAddress fileAddr,
Map<String, String> properties,
long timestamp) throws DdlException {
if (isSubLabelUsed(subLabel, timestamp)) {
// sub label is used and this is a retry request.
@ -334,7 +336,6 @@ public class MultiLoadMgr {
return backendId;
}
public LoadStmt toLoadStmt() throws DdlException {
LabelName commitLabel = multiLabel;
@ -380,8 +381,8 @@ public class MultiLoadMgr {
private Set<Long> timestamps = Sets.newHashSet();
public TableLoadDesc(String tbl, String label, List<Pair<String, Long>> files,
TNetworkAddress address, Map<String, String> properties,
long timestamp) {
TNetworkAddress address, Map<String, String> properties,
long timestamp) {
this.tbl = tbl;
this.filesByLabel = Maps.newLinkedHashMap();
@ -415,7 +416,6 @@ public class MultiLoadMgr {
timestamps.add(timestamp);
}
public Long getBackendId() {
return backendId;
}
@ -463,13 +463,16 @@ public class MultiLoadMgr {
}
}
if (properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS) != null) {
String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS)
.trim().split("\\s*,\\s*");
partitionNames = new PartitionNames(false, Lists.newArrayList(partNames));
String[] splitPartNames = properties.get(LoadStmt.KEY_IN_PARAM_PARTITIONS).trim().split(",");
List<String> partNames = Arrays.stream(splitPartNames).map(String::trim)
.collect(Collectors.toList());
partitionNames = new PartitionNames(false, partNames);
} else if (properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS) != null) {
String[] partNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS)
.trim().split("\\s*,\\s*");
partitionNames = new PartitionNames(true, Lists.newArrayList(partNames));
String[] splitTempPartNames = properties.get(LoadStmt.KEY_IN_PARAM_TEMP_PARTITIONS).trim()
.split(",");
List<String> tempPartNames = Arrays.stream(splitTempPartNames).map(String::trim)
.collect(Collectors.toList());
partitionNames = new PartitionNames(true, tempPartNames);
}
if (properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE) != null) {
mergeType = LoadTask.MergeType.valueOf(properties.get(LoadStmt.KEY_IN_PARAM_MERGE_TYPE));
@ -486,7 +489,7 @@ public class MultiLoadMgr {
jsonPaths = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONPATHS, "");
jsonRoot = properties.getOrDefault(LoadStmt.KEY_IN_PARAM_JSONROOT, "");
fuzzyParse = Boolean.valueOf(
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false"));
properties.getOrDefault(LoadStmt.KEY_IN_PARAM_FUZZY_PARSE, "false"));
}
}
DataDescription dataDescription = new DataDescription(tbl, partitionNames, files, null, columnSeparator,

View File

@ -37,13 +37,13 @@ import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TUniqueId;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class StreamLoadTask implements LoadTaskInfo {
@ -269,7 +269,6 @@ public class StreamLoadTask implements LoadTaskInfo {
return !Strings.isNullOrEmpty(sequenceCol);
}
@Override
public String getSequenceCol() {
return sequenceCol;
@ -358,11 +357,12 @@ public class StreamLoadTask implements LoadTaskInfo {
headerType = request.getHeaderType();
}
if (request.isSetPartitions()) {
String[] partNames = request.getPartitions().trim().split("\\s*,\\s*");
String[] splitPartNames = request.getPartitions().trim().split(",");
List<String> partNames = Arrays.stream(splitPartNames).map(String::trim).collect(Collectors.toList());
if (request.isSetIsTempPartition()) {
partitions = new PartitionNames(request.isIsTempPartition(), Lists.newArrayList(partNames));
partitions = new PartitionNames(request.isIsTempPartition(), partNames);
} else {
partitions = new PartitionNames(false, Lists.newArrayList(partNames));
partitions = new PartitionNames(false, partNames);
}
}
switch (request.getFileType()) {