[Dynamic Partition] Unify dynamic partition name and range (#3193)
Generates partition names based on the granularity. eg: Year:prefix2020 Day: prefix20200325 Week: prefix2020_#, # is the week of year. At the same time, for all granularity, align the partition range to 00:00:00.
This commit is contained in:
@ -41,7 +41,8 @@ public class DynamicPartitionProperty{
|
||||
this.exist = true;
|
||||
this.enable = Boolean.parseBoolean(properties.get(ENABLE));
|
||||
this.timeUnit = properties.get(TIME_UNIT);
|
||||
this.start = Integer.parseInt(properties.get(START));
|
||||
// In order to compatible dynamic add partition version
|
||||
this.start = Integer.parseInt(properties.getOrDefault(START, String.valueOf(Integer.MIN_VALUE)));
|
||||
this.end = Integer.parseInt(properties.get(END));
|
||||
this.prefix = properties.get(PREFIX);
|
||||
this.buckets = Integer.parseInt(properties.get(BUCKETS));
|
||||
|
||||
@ -163,13 +163,7 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
if (this_key_len < other_key_len) {
|
||||
return -1;
|
||||
} else if (this_key_len > other_key_len) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
return Integer.compare(this_key_len, other_key_len);
|
||||
}
|
||||
|
||||
// return: ("100", "200", "300")
|
||||
|
||||
@ -175,7 +175,7 @@ public class DynamicPartitionScheduler extends MasterDaemon {
|
||||
PartitionKeyDesc partitionKeyDesc = new PartitionKeyDesc(Collections.singletonList(lowerValue), Collections.singletonList(upperValue));
|
||||
HashMap<String, String> partitionProperties = new HashMap<>(1);
|
||||
partitionProperties.put("replication_num", String.valueOf(DynamicPartitionUtil.estimateReplicateNum(olapTable)));
|
||||
String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(prevBorder);
|
||||
String partitionName = dynamicPartitionProperty.getPrefix() + DynamicPartitionUtil.getFormattedPartitionName(prevBorder, dynamicPartitionProperty.getTimeUnit());
|
||||
SingleRangePartitionDesc rangePartitionDesc = new SingleRangePartitionDesc(true, partitionName,
|
||||
partitionKeyDesc, partitionProperties);
|
||||
|
||||
|
||||
@ -52,7 +52,6 @@ import com.google.common.collect.Lists;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
@ -139,7 +138,7 @@ public class PartitionsProcDir implements ProcDirInterface {
|
||||
|
||||
public ProcResult fetchResultByFilter(Map<String, Expr> filterMap, List<OrderByPair> orderByPairs, LimitElement limitElement) throws AnalysisException {
|
||||
List<List<Comparable>> partitionInfos = getPartitionInfos();
|
||||
List<List<Comparable>> filterPartitionInfos = null;
|
||||
List<List<Comparable>> filterPartitionInfos;
|
||||
//where
|
||||
if (filterMap == null || filterMap.isEmpty()) {
|
||||
filterPartitionInfos = partitionInfos;
|
||||
@ -166,10 +165,10 @@ public class PartitionsProcDir implements ProcDirInterface {
|
||||
|
||||
// order by
|
||||
if (orderByPairs != null) {
|
||||
ListComparator<List<Comparable>> comparator = null;
|
||||
ListComparator<List<Comparable>> comparator;
|
||||
OrderByPair[] orderByPairArr = new OrderByPair[orderByPairs.size()];
|
||||
comparator = new ListComparator<>(orderByPairs.toArray(orderByPairArr));
|
||||
Collections.sort(filterPartitionInfos, comparator);
|
||||
filterPartitionInfos.sort(comparator);
|
||||
}
|
||||
|
||||
//limit
|
||||
|
||||
@ -36,14 +36,19 @@ import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.ErrorReport;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Calendar;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class DynamicPartitionUtil {
|
||||
private static final Logger LOG = LogManager.getLogger(DynamicPartitionUtil.class);
|
||||
|
||||
private static final String TIMESTAMP_FORMAT = "yyyyMMdd";
|
||||
private static final String DATE_FORMAT = "yyyy-MM-dd";
|
||||
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||
@ -261,8 +266,23 @@ public class DynamicPartitionUtil {
|
||||
}
|
||||
}
|
||||
|
||||
public static String getFormattedPartitionName(String name) {
|
||||
return name.replace("-", "").replace(":", "").replace(" ", "");
|
||||
public static String getFormattedPartitionName(String name, String timeUnit) {
|
||||
name = name.replace("-", "").replace(":", "").replace(" ", "");
|
||||
if (timeUnit.equalsIgnoreCase(TimeUnit.DAY.toString())) {
|
||||
return name.substring(0, 8);
|
||||
} else if (timeUnit.equalsIgnoreCase(TimeUnit.MONTH.toString())) {
|
||||
return name.substring(0, 6);
|
||||
} else {
|
||||
name = name.substring(0, 8);
|
||||
Calendar calendar = Calendar.getInstance();
|
||||
try {
|
||||
calendar.setTime(new SimpleDateFormat("yyyyMMdd").parse(name));
|
||||
} catch (ParseException e) {
|
||||
LOG.warn("Format dynamic partition name error. Error={}", e.getMessage());
|
||||
return name;
|
||||
}
|
||||
return String.format("%s_%02d", calendar.get(Calendar.YEAR), calendar.get(Calendar.WEEK_OF_YEAR));
|
||||
}
|
||||
}
|
||||
|
||||
public static String getPartitionRange(String timeUnit, int offset, Calendar calendar, String format) {
|
||||
@ -273,6 +293,10 @@ public class DynamicPartitionUtil {
|
||||
} else {
|
||||
calendar.add(Calendar.MONTH, offset);
|
||||
}
|
||||
// dynamic partition's time accuracy is DAY
|
||||
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
||||
calendar.set(Calendar.MINUTE, 0);
|
||||
calendar.set(Calendar.SECOND, 0);
|
||||
SimpleDateFormat dateFormat = new SimpleDateFormat(format);
|
||||
return dateFormat.format(calendar.getTime());
|
||||
}
|
||||
|
||||
@ -634,7 +634,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
|
||||
result.setTxnId(loadTxnBeginImpl(request, clientAddr));
|
||||
} catch (DuplicatedRequestException e) {
|
||||
// this is a duplicate request, just return previous txn id
|
||||
LOG.info("deplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
|
||||
LOG.info("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(), e.getTxnId());
|
||||
result.setTxnId(e.getTxnId());
|
||||
} catch (LabelAlreadyUsedException e) {
|
||||
status.setStatus_code(TStatusCode.LABEL_ALREADY_EXISTS);
|
||||
|
||||
Reference in New Issue
Block a user