[enhance](mtmv) MTMV Use partial partition of base table (#31632)
MTMV add 3 properties: partition_sync_limit: digit partition_sync_time_unit: DAY/MONTH/YEAR partition_sync_date_format: like "%Y-%m-%d"/"%Y%m%d" For example, the current time is 2020-02-03 20:10:10 - If partition_sync_limit is set to 1 and partition_sync_time_unit is set to DAY, only partitions with a time greater than or equal to 2020-02-03 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 1 and partition_sync_time_unit is set to MONTH, only partitions with a time greater than or equal to 2020-02-01 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 1 and partition_sync_time_unit is set to YEAR, only partitions with a time greater than or equal to 2020-01-01 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 3 and partition_sync_time_unit is set to MONTH, only partitions with a time greater than or equal to 2019-12-01 00:00:00 will be synchronized to the MTMV - If partition_sync_limit is set to 4 and partition_sync_time_unit is set to DAY, only partitions with a time greater than or equal to 2020-01-31 00:00:00 will be synchronized to the MTMV
This commit is contained in:
@ -20,6 +20,7 @@ package org.apache.doris.catalog;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
@ -29,6 +30,7 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@ -99,6 +101,25 @@ public class ListPartitionItem extends PartitionItem {
|
||||
return PartitionKeyDesc.createIn(Lists.newArrayList(res));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOptional, long nowTruncSubSec)
|
||||
throws AnalysisException {
|
||||
for (PartitionKey partitionKey : partitionKeys) {
|
||||
if (partitionKey.getKeys().size() <= pos) {
|
||||
throw new AnalysisException(
|
||||
String.format("toPartitionKeyDesc IndexOutOfBounds, partitionKey: %s, pos: %d",
|
||||
partitionKey.toString(),
|
||||
pos));
|
||||
}
|
||||
if (!isDefaultPartition() && MTMVUtil.getExprTimeSec(partitionKey.getKeys().get(pos), dateFormatOptional)
|
||||
>= nowTruncSubSec) {
|
||||
// As long as one of the partitionKeys meets the requirements, this partition needs to be retained
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeInt(partitionKeys.size());
|
||||
|
||||
@ -38,6 +38,7 @@ import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
|
||||
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
|
||||
import org.apache.doris.mtmv.MTMVRelation;
|
||||
import org.apache.doris.mtmv.MTMVStatus;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.persist.gson.GsonUtils;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
@ -217,7 +218,7 @@ public class MTMV extends OlapTable {
|
||||
public long getGracePeriod() {
|
||||
readMvLock();
|
||||
try {
|
||||
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
|
||||
if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD))) {
|
||||
return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000;
|
||||
} else {
|
||||
return 0L;
|
||||
@ -243,7 +244,7 @@ public class MTMV extends OlapTable {
|
||||
public int getRefreshPartitionNum() {
|
||||
readMvLock();
|
||||
try {
|
||||
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
|
||||
if (!StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM))) {
|
||||
int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
|
||||
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
|
||||
} else {
|
||||
@ -257,7 +258,7 @@ public class MTMV extends OlapTable {
|
||||
public Set<String> getExcludedTriggerTables() {
|
||||
readMvLock();
|
||||
try {
|
||||
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
|
||||
if (StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES))) {
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
String[] split = mvProperties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES).split(",");
|
||||
@ -326,8 +327,10 @@ public class MTMV extends OlapTable {
|
||||
return Maps.newHashMap();
|
||||
}
|
||||
Map<PartitionKeyDesc, Set<Long>> res = new HashMap<>();
|
||||
Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems();
|
||||
int relatedColPos = mvPartitionInfo.getRelatedColPos();
|
||||
Map<Long, PartitionItem> relatedPartitionItems = mvPartitionInfo.getRelatedTable()
|
||||
.getPartitionItemsByTimeFilter(relatedColPos,
|
||||
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties));
|
||||
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
|
||||
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(relatedColPos);
|
||||
if (res.containsKey(partitionKeyDesc)) {
|
||||
|
||||
@ -23,6 +23,7 @@ import org.apache.doris.common.io.Writable;
|
||||
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public abstract class PartitionItem implements Comparable<PartitionItem>, Writable {
|
||||
public static final Comparator<Map.Entry<Long, PartitionItem>> ITEM_MAP_ENTRY_COMPARATOR =
|
||||
@ -46,4 +47,17 @@ public abstract class PartitionItem implements Comparable<PartitionItem>, Writab
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public abstract PartitionKeyDesc toPartitionKeyDesc(int pos) throws AnalysisException;
|
||||
|
||||
/**
|
||||
* Check if the partition meets the time requirements
|
||||
*
|
||||
* @param pos The position of the partition column to be checked in all partition columns
|
||||
* @param dateFormatOptional Convert other types to date format
|
||||
* @param nowTruncSubSec The time to compare
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public abstract boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOptional,
|
||||
long nowTruncSubSec)
|
||||
throws AnalysisException;
|
||||
}
|
||||
|
||||
@ -18,12 +18,15 @@
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
|
||||
import com.google.common.collect.Range;
|
||||
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Optional;
|
||||
|
||||
public class RangePartitionItem extends PartitionItem {
|
||||
private Range<PartitionKey> partitionKeyRange;
|
||||
@ -60,6 +63,21 @@ public class RangePartitionItem extends PartitionItem {
|
||||
return toPartitionKeyDesc();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isGreaterThanSpecifiedTime(int pos, Optional<String> dateFormatOptional, long nowTruncSubSec)
|
||||
throws AnalysisException {
|
||||
PartitionKey partitionKey = partitionKeyRange.upperEndpoint();
|
||||
if (partitionKey.getKeys().size() <= pos) {
|
||||
throw new AnalysisException(
|
||||
String.format("toPartitionKeyDesc IndexOutOfBounds, partitionKey: %s, pos: %d",
|
||||
partitionKey.toString(),
|
||||
pos));
|
||||
}
|
||||
// If the upper limit of the partition range meets the requirements, this partition needs to be retained
|
||||
return !isDefaultPartition() && MTMVUtil.getExprTimeSec(partitionKey.getKeys().get(pos), dateFormatOptional)
|
||||
>= nowTruncSubSec;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
RangeUtils.writeRange(out, partitionKeyRange);
|
||||
|
||||
@ -165,6 +165,9 @@ public class PropertyAnalyzer {
|
||||
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables";
|
||||
public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num";
|
||||
public static final String PROPERTIES_WORKLOAD_GROUP = "workload_group";
|
||||
public static final String PROPERTIES_PARTITION_SYNC_LIMIT = "partition_sync_limit";
|
||||
public static final String PROPERTIES_PARTITION_TIME_UNIT = "partition_sync_time_unit";
|
||||
public static final String PROPERTIES_PARTITION_DATE_FORMAT = "partition_date_format";
|
||||
// For unique key data model, the feature Merge-on-Write will leverage a primary
|
||||
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
|
||||
// which can avoid the merging cost in read stage, and accelerate the aggregation
|
||||
|
||||
@ -0,0 +1,57 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public class MTMVPartitionSyncConfig {
|
||||
private int syncLimit;
|
||||
private MTMVPartitionSyncTimeUnit timeUnit;
|
||||
private Optional<String> dateFormat;
|
||||
|
||||
public MTMVPartitionSyncConfig(int syncLimit, MTMVPartitionSyncTimeUnit timeUnit,
|
||||
Optional<String> dateFormat) {
|
||||
this.syncLimit = syncLimit;
|
||||
this.timeUnit = timeUnit;
|
||||
this.dateFormat = dateFormat;
|
||||
}
|
||||
|
||||
public int getSyncLimit() {
|
||||
return syncLimit;
|
||||
}
|
||||
|
||||
public void setSyncLimit(int syncLimit) {
|
||||
this.syncLimit = syncLimit;
|
||||
}
|
||||
|
||||
public MTMVPartitionSyncTimeUnit getTimeUnit() {
|
||||
return timeUnit;
|
||||
}
|
||||
|
||||
public void setTimeUnit(MTMVPartitionSyncTimeUnit timeUnit) {
|
||||
this.timeUnit = timeUnit;
|
||||
}
|
||||
|
||||
public Optional<String> getDateFormat() {
|
||||
return dateFormat;
|
||||
}
|
||||
|
||||
public void setDateFormat(Optional<String> dateFormat) {
|
||||
this.dateFormat = dateFormat;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,36 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
public enum MTMVPartitionSyncTimeUnit {
|
||||
YEAR,
|
||||
MONTH,
|
||||
DAY;
|
||||
|
||||
public static Optional<MTMVPartitionSyncTimeUnit> fromString(String unit) {
|
||||
for (MTMVPartitionSyncTimeUnit u : MTMVPartitionSyncTimeUnit.values()) {
|
||||
if (u.name().equalsIgnoreCase(unit)) {
|
||||
return Optional.of(u);
|
||||
}
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@ -122,10 +122,12 @@ public class MTMVPartitionUtil {
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static List<AllPartitionDesc> getPartitionDescsByRelatedTable(MTMVRelatedTableIf relatedTable,
|
||||
Map<String, String> tableProperties, String relatedCol) throws AnalysisException {
|
||||
Map<String, String> tableProperties, String relatedCol, Map<String, String> mvProperties)
|
||||
throws AnalysisException {
|
||||
HashMap<String, String> partitionProperties = Maps.newHashMap();
|
||||
List<AllPartitionDesc> res = Lists.newArrayList();
|
||||
Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol);
|
||||
Set<PartitionKeyDesc> relatedPartitionDescs = getRelatedPartitionDescs(relatedTable, relatedCol,
|
||||
MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties));
|
||||
for (PartitionKeyDesc partitionKeyDesc : relatedPartitionDescs) {
|
||||
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
|
||||
generatePartitionName(partitionKeyDesc),
|
||||
@ -137,11 +139,12 @@ public class MTMVPartitionUtil {
|
||||
return res;
|
||||
}
|
||||
|
||||
private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol)
|
||||
private static Set<PartitionKeyDesc> getRelatedPartitionDescs(MTMVRelatedTableIf relatedTable, String relatedCol,
|
||||
MTMVPartitionSyncConfig config)
|
||||
throws AnalysisException {
|
||||
int pos = getPos(relatedTable, relatedCol);
|
||||
Set<PartitionKeyDesc> res = Sets.newHashSet();
|
||||
for (Entry<Long, PartitionItem> entry : relatedTable.getAndCopyPartitionItems().entrySet()) {
|
||||
for (Entry<Long, PartitionItem> entry : relatedTable.getPartitionItemsByTimeFilter(pos, config).entrySet()) {
|
||||
PartitionKeyDesc partitionKeyDesc = entry.getValue().toPartitionKeyDesc(pos);
|
||||
res.add(partitionKeyDesc);
|
||||
}
|
||||
|
||||
@ -0,0 +1,138 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVPropertyUtil {
|
||||
public static final Set<String> mvPropertyKeys = Sets.newHashSet(
|
||||
PropertyAnalyzer.PROPERTIES_GRACE_PERIOD,
|
||||
PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES,
|
||||
PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM,
|
||||
PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP,
|
||||
PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT,
|
||||
PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT,
|
||||
PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT
|
||||
);
|
||||
|
||||
public static void analyzeProperty(String key, String value) {
|
||||
switch (key) {
|
||||
case PropertyAnalyzer.PROPERTIES_GRACE_PERIOD:
|
||||
analyzeGracePeriod(value);
|
||||
break;
|
||||
case PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM:
|
||||
analyzeRefreshPartitionNum(value);
|
||||
break;
|
||||
case PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES:
|
||||
analyzeExcludedTriggerTables(value);
|
||||
break;
|
||||
case PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP:
|
||||
analyzeWorkloadGroup(value);
|
||||
break;
|
||||
case PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT:
|
||||
analyzePartitionTimeUnit(value);
|
||||
break;
|
||||
case PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT:
|
||||
analyzePartitionDateFormat(value);
|
||||
break;
|
||||
case PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT:
|
||||
analyzePartitionSyncLimit(value);
|
||||
break;
|
||||
default:
|
||||
throw new AnalysisException("illegal key:" + key);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private static void analyzePartitionSyncLimit(String value) {
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Integer.parseInt(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("valid partition_sync_limit: " + value);
|
||||
}
|
||||
}
|
||||
|
||||
private static void analyzePartitionDateFormat(String value) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
private static void analyzePartitionTimeUnit(String value) {
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return;
|
||||
}
|
||||
Optional<MTMVPartitionSyncTimeUnit> mtmvPartitionSyncTimeUnit = MTMVPartitionSyncTimeUnit
|
||||
.fromString(value);
|
||||
if (!mtmvPartitionSyncTimeUnit.isPresent()) {
|
||||
throw new AnalysisException("valid partition_sync_time_unit: " + value);
|
||||
}
|
||||
}
|
||||
|
||||
private static void analyzeWorkloadGroup(String value) {
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return;
|
||||
}
|
||||
if (!StringUtils.isEmpty(value) && !Env.getCurrentEnv().getAccessManager()
|
||||
.checkWorkloadGroupPriv(ConnectContext.get(), value, PrivPredicate.USAGE)) {
|
||||
String message = String
|
||||
.format("Access denied; you need (at least one of) "
|
||||
+ "the %s privilege(s) to use workload group '%s'.",
|
||||
"USAGE/ADMIN", value);
|
||||
throw new AnalysisException(message);
|
||||
}
|
||||
}
|
||||
|
||||
private static void analyzeExcludedTriggerTables(String value) {
|
||||
// do nothing
|
||||
}
|
||||
|
||||
private static void analyzeGracePeriod(String value) {
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Long.parseLong(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("valid grace_period: " + value);
|
||||
}
|
||||
}
|
||||
|
||||
private static void analyzeRefreshPartitionNum(String value) {
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Integer.parseInt(value);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException("valid refresh_partition_num: " + value);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@ -24,8 +24,12 @@ import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
@ -40,6 +44,31 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
*/
|
||||
Map<Long, PartitionItem> getAndCopyPartitionItems();
|
||||
|
||||
/**
|
||||
* Obtain a list of partitions filtered by time
|
||||
*
|
||||
* @param pos The position of the partition column to be checked in all partition columns
|
||||
* @param config
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
default Map<Long, PartitionItem> getPartitionItemsByTimeFilter(int pos, MTMVPartitionSyncConfig config)
|
||||
throws AnalysisException {
|
||||
Map<Long, PartitionItem> partitionItems = getAndCopyPartitionItems();
|
||||
if (config.getSyncLimit() <= 0) {
|
||||
return partitionItems;
|
||||
}
|
||||
long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(config.getTimeUnit(), config.getSyncLimit());
|
||||
Optional<String> dateFormat = config.getDateFormat();
|
||||
Map<Long, PartitionItem> res = Maps.newHashMap();
|
||||
for (Entry<Long, PartitionItem> entry : partitionItems.entrySet()) {
|
||||
if (entry.getValue().isGreaterThanSpecifiedTime(pos, dateFormat, nowTruncSubSec)) {
|
||||
res.put(entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* getPartitionType LIST/RANGE/UNPARTITIONED
|
||||
*
|
||||
@ -92,7 +121,7 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
* Does the current type of table allow timed triggering
|
||||
*
|
||||
* @return If return false,The method of comparing whether to synchronize will directly return true,
|
||||
* otherwise the snapshot information will be compared
|
||||
* otherwise the snapshot information will be compared
|
||||
*/
|
||||
boolean needAutoRefresh();
|
||||
|
||||
|
||||
@ -25,8 +25,21 @@ import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeArithmetic;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeExtractAndTransform;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVUtil {
|
||||
@ -52,7 +65,7 @@ public class MTMVUtil {
|
||||
}
|
||||
|
||||
/**
|
||||
* if base tables of mtmv contains external table
|
||||
* if base tables of mtmv contains external table
|
||||
*
|
||||
* @param mtmv
|
||||
* @return
|
||||
@ -66,4 +79,116 @@ public class MTMVUtil {
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the minimum second from `syncLimit` `timeUnit` ago
|
||||
*
|
||||
* @param timeUnit
|
||||
* @param syncLimit
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static long getNowTruncSubSec(MTMVPartitionSyncTimeUnit timeUnit, int syncLimit)
|
||||
throws AnalysisException {
|
||||
if (syncLimit < 1) {
|
||||
throw new AnalysisException("Unexpected syncLimit, syncLimit: " + syncLimit);
|
||||
}
|
||||
// get current time
|
||||
Expression now = DateTimeAcquire.now();
|
||||
if (!(now instanceof DateTimeLiteral)) {
|
||||
throw new AnalysisException("now() should return DateTimeLiteral, now: " + now);
|
||||
}
|
||||
DateTimeLiteral nowLiteral = (DateTimeLiteral) now;
|
||||
// date trunc
|
||||
now = DateTimeExtractAndTransform
|
||||
.dateTrunc(nowLiteral, new VarcharLiteral(timeUnit.name()));
|
||||
if (!(now instanceof DateTimeLiteral)) {
|
||||
throw new AnalysisException("dateTrunc() should return DateTimeLiteral, now: " + now);
|
||||
}
|
||||
nowLiteral = (DateTimeLiteral) now;
|
||||
// date sub
|
||||
if (syncLimit > 1) {
|
||||
nowLiteral = dateSub(nowLiteral, timeUnit, syncLimit - 1);
|
||||
}
|
||||
return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp(nowLiteral)).getValue();
|
||||
}
|
||||
|
||||
private static DateTimeLiteral dateSub(
|
||||
org.apache.doris.nereids.trees.expressions.literal.DateLiteral date, MTMVPartitionSyncTimeUnit timeUnit,
|
||||
int num)
|
||||
throws AnalysisException {
|
||||
IntegerLiteral integerLiteral = new IntegerLiteral(num);
|
||||
Expression result;
|
||||
switch (timeUnit) {
|
||||
case DAY:
|
||||
result = DateTimeArithmetic.dateSub(date, integerLiteral);
|
||||
break;
|
||||
case YEAR:
|
||||
result = DateTimeArithmetic.yearsSub(date, integerLiteral);
|
||||
break;
|
||||
case MONTH:
|
||||
result = DateTimeArithmetic.monthsSub(date, integerLiteral);
|
||||
break;
|
||||
default:
|
||||
throw new AnalysisException("MTMV partition limit not support timeUnit: " + timeUnit.name());
|
||||
}
|
||||
if (!(result instanceof DateTimeLiteral)) {
|
||||
throw new AnalysisException("sub() should return DateTimeLiteral, result: " + result);
|
||||
}
|
||||
return (DateTimeLiteral) result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convert LiteralExpr to second
|
||||
*
|
||||
* @param expr
|
||||
* @param dateFormatOptional
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static long getExprTimeSec(org.apache.doris.analysis.LiteralExpr expr, Optional<String> dateFormatOptional)
|
||||
throws AnalysisException {
|
||||
if (expr instanceof org.apache.doris.analysis.MaxLiteral) {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
if (expr instanceof org.apache.doris.analysis.NullLiteral) {
|
||||
return Long.MIN_VALUE;
|
||||
}
|
||||
if (expr instanceof org.apache.doris.analysis.DateLiteral) {
|
||||
return ((org.apache.doris.analysis.DateLiteral) expr).unixTimestamp(TimeUtils.getTimeZone()) / 1000;
|
||||
}
|
||||
if (!dateFormatOptional.isPresent()) {
|
||||
throw new AnalysisException("expr is not DateLiteral and DateFormat is not present.");
|
||||
}
|
||||
String dateFormat = dateFormatOptional.get();
|
||||
Expression strToDate = DateTimeExtractAndTransform
|
||||
.strToDate(new VarcharLiteral(expr.getStringValue()), new VarcharLiteral(dateFormat));
|
||||
if (!(strToDate instanceof DateTimeLiteral)) {
|
||||
throw new AnalysisException(
|
||||
String.format("strToDate failed, stringValue: %s, dateFormat: %s", expr.getStringValue(),
|
||||
dateFormat));
|
||||
}
|
||||
return ((IntegerLiteral) DateTimeExtractAndTransform.unixTimestamp((DateTimeLiteral) strToDate)).getValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generate MTMVPartitionSyncConfig based on mvProperties
|
||||
*
|
||||
* @param mvProperties
|
||||
* @return
|
||||
*/
|
||||
public static MTMVPartitionSyncConfig generateMTMVPartitionSyncConfigByProperties(
|
||||
Map<String, String> mvProperties) {
|
||||
int syncLimit = StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT)) ? -1
|
||||
: Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT));
|
||||
MTMVPartitionSyncTimeUnit timeUnit =
|
||||
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT))
|
||||
? MTMVPartitionSyncTimeUnit.DAY : MTMVPartitionSyncTimeUnit
|
||||
.valueOf(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT).toUpperCase());
|
||||
Optional<String> dateFormat =
|
||||
StringUtils.isEmpty(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT))
|
||||
? Optional.empty()
|
||||
: Optional.of(mvProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT));
|
||||
return new MTMVPartitionSyncConfig(syncLimit, timeUnit, dateFormat);
|
||||
}
|
||||
}
|
||||
|
||||
@ -19,13 +19,10 @@ package org.apache.doris.nereids.trees.plans.commands.info;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.mtmv.MTMVPropertyUtil;
|
||||
import org.apache.doris.nereids.exceptions.AnalysisException;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
@ -55,40 +52,8 @@ public class AlterMTMVPropertyInfo extends AlterMTMVInfo {
|
||||
|
||||
private void analyzeProperties() {
|
||||
for (String key : properties.keySet()) {
|
||||
if (PropertyAnalyzer.PROPERTIES_GRACE_PERIOD.equals(key)) {
|
||||
String gracePeriod = properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
|
||||
try {
|
||||
Long.parseLong(gracePeriod);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new org.apache.doris.nereids.exceptions.AnalysisException(
|
||||
"valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
|
||||
}
|
||||
} else if (PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) {
|
||||
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
|
||||
try {
|
||||
Integer.parseInt(refreshPartitionNum);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException(
|
||||
"valid refresh_partition_num: " + properties
|
||||
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
|
||||
}
|
||||
} else if (PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) {
|
||||
// nothing
|
||||
} else if (PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP.equals(key)) {
|
||||
String workloadGroup = properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
|
||||
if (!StringUtils.isEmpty(workloadGroup) && !Env.getCurrentEnv().getAccessManager()
|
||||
.checkWorkloadGroupPriv(ConnectContext.get(), workloadGroup, PrivPredicate.USAGE)) {
|
||||
String message = String
|
||||
.format("Access denied; you need (at least one of) "
|
||||
+ "the %s privilege(s) to use workload group '%s'.",
|
||||
"USAGE/ADMIN", workloadGroup);
|
||||
throw new AnalysisException(message);
|
||||
}
|
||||
} else {
|
||||
throw new org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key);
|
||||
}
|
||||
MTMVPropertyUtil.analyzeProperty(key, properties.get(key));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public Map<String, String> getProperties() {
|
||||
|
||||
@ -34,13 +34,13 @@ import org.apache.doris.catalog.View;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
import org.apache.doris.common.FeNameFormat;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.mtmv.EnvInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVPartitionUtil;
|
||||
import org.apache.doris.mtmv.MTMVPlanUtil;
|
||||
import org.apache.doris.mtmv.MTMVPropertyUtil;
|
||||
import org.apache.doris.mtmv.MTMVRefreshInfo;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.mtmv.MTMVRelation;
|
||||
@ -177,46 +177,12 @@ public class CreateMTMVInfo {
|
||||
}
|
||||
|
||||
private void analyzeProperties() {
|
||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
|
||||
String gracePeriod = properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
|
||||
try {
|
||||
Long.parseLong(gracePeriod);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException(
|
||||
"valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
|
||||
for (String key : MTMVPropertyUtil.mvPropertyKeys) {
|
||||
if (properties.containsKey(key)) {
|
||||
MTMVPropertyUtil.analyzeProperty(key, properties.get(key));
|
||||
mvProperties.put(key, properties.get(key));
|
||||
properties.remove(key);
|
||||
}
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod);
|
||||
properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
|
||||
}
|
||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
|
||||
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
|
||||
try {
|
||||
Integer.parseInt(refreshPartitionNum);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException(
|
||||
"valid refresh_partition_num: " + properties
|
||||
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
|
||||
}
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, refreshPartitionNum);
|
||||
properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
|
||||
}
|
||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
|
||||
String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables);
|
||||
properties.remove(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
|
||||
}
|
||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP)) {
|
||||
String workloadGroup = properties.get(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
|
||||
if (!Env.getCurrentEnv().getAccessManager()
|
||||
.checkWorkloadGroupPriv(ConnectContext.get(), workloadGroup, PrivPredicate.USAGE)) {
|
||||
String message = String
|
||||
.format("Access denied;"
|
||||
+ " you need (at least one of) the %s privilege(s) to use workload group '%s'.",
|
||||
"USAGE/ADMIN", workloadGroup);
|
||||
throw new AnalysisException(message);
|
||||
}
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP, workloadGroup);
|
||||
properties.remove(PropertyAnalyzer.PROPERTIES_WORKLOAD_GROUP);
|
||||
}
|
||||
}
|
||||
|
||||
@ -324,7 +290,8 @@ public class CreateMTMVInfo {
|
||||
List<AllPartitionDesc> allPartitionDescs = null;
|
||||
try {
|
||||
allPartitionDescs = MTMVPartitionUtil
|
||||
.getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol());
|
||||
.getPartitionDescsByRelatedTable(relatedTable, properties, mvPartitionInfo.getRelatedCol(),
|
||||
mvProperties);
|
||||
} catch (org.apache.doris.common.AnalysisException e) {
|
||||
throw new AnalysisException("getPartitionDescsByRelatedTable failed", e);
|
||||
}
|
||||
|
||||
113
fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java
Normal file
113
fe/fe-core/src/test/java/org/apache/doris/mtmv/MTMVUtilTest.java
Normal file
@ -0,0 +1,113 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.analysis.DateLiteral;
|
||||
import org.apache.doris.analysis.IntLiteral;
|
||||
import org.apache.doris.analysis.LiteralExpr;
|
||||
import org.apache.doris.analysis.StringLiteral;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.nereids.trees.expressions.functions.executable.DateTimeAcquire;
|
||||
import org.apache.doris.nereids.trees.expressions.literal.DateTimeLiteral;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import mockit.Expectations;
|
||||
import mockit.Mocked;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public class MTMVUtilTest {
|
||||
@Mocked
|
||||
private DateTimeAcquire dateTimeAcquire;
|
||||
|
||||
@Test
|
||||
public void testGenerateMTMVPartitionSyncConfigByProperties() throws AnalysisException {
|
||||
Map<String, String> mvProperties = Maps.newHashMap();
|
||||
MTMVPartitionSyncConfig config = MTMVUtil
|
||||
.generateMTMVPartitionSyncConfigByProperties(mvProperties);
|
||||
Assert.assertEquals(-1, config.getSyncLimit());
|
||||
Assert.assertFalse(config.getDateFormat().isPresent());
|
||||
Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit());
|
||||
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_SYNC_LIMIT, "1");
|
||||
config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties);
|
||||
Assert.assertEquals(1, config.getSyncLimit());
|
||||
Assert.assertFalse(config.getDateFormat().isPresent());
|
||||
Assert.assertEquals(MTMVPartitionSyncTimeUnit.DAY, config.getTimeUnit());
|
||||
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_TIME_UNIT, "month");
|
||||
config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties);
|
||||
Assert.assertEquals(1, config.getSyncLimit());
|
||||
Assert.assertFalse(config.getDateFormat().isPresent());
|
||||
Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit());
|
||||
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_PARTITION_DATE_FORMAT, "%Y%m%d");
|
||||
config = MTMVUtil.generateMTMVPartitionSyncConfigByProperties(mvProperties);
|
||||
Assert.assertEquals(1, config.getSyncLimit());
|
||||
Assert.assertEquals("%Y%m%d", config.getDateFormat().get());
|
||||
Assert.assertEquals(MTMVPartitionSyncTimeUnit.MONTH, config.getTimeUnit());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetExprTimeSec() throws AnalysisException {
|
||||
LiteralExpr expr = new DateLiteral("2020-01-01");
|
||||
long exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty());
|
||||
Assert.assertEquals(1577808000L, exprTimeSec);
|
||||
expr = new StringLiteral("2020-01-01");
|
||||
exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.of("%Y-%m-%d"));
|
||||
Assert.assertEquals(1577808000L, exprTimeSec);
|
||||
expr = new IntLiteral(20200101);
|
||||
exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.of("%Y%m%d"));
|
||||
Assert.assertEquals(1577808000L, exprTimeSec);
|
||||
expr = new DateLiteral(Type.DATE, true);
|
||||
exprTimeSec = MTMVUtil.getExprTimeSec(expr, Optional.empty());
|
||||
Assert.assertEquals(253402185600L, exprTimeSec);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetNowTruncSubSec() throws AnalysisException {
|
||||
DateTimeLiteral dateTimeLiteral = new DateTimeLiteral("2020-02-03 20:10:10");
|
||||
new Expectations() {
|
||||
{
|
||||
dateTimeAcquire.now();
|
||||
minTimes = 0;
|
||||
result = dateTimeLiteral;
|
||||
}
|
||||
};
|
||||
long nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 1);
|
||||
// 2020-02-03
|
||||
Assert.assertEquals(1580659200L, nowTruncSubSec);
|
||||
nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 1);
|
||||
// 2020-02-01
|
||||
Assert.assertEquals(1580486400L, nowTruncSubSec);
|
||||
nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.YEAR, 1);
|
||||
// 2020-01-01
|
||||
Assert.assertEquals(1577808000L, nowTruncSubSec);
|
||||
nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.MONTH, 3);
|
||||
// 2019-12-01
|
||||
Assert.assertEquals(1575129600L, nowTruncSubSec);
|
||||
nowTruncSubSec = MTMVUtil.getNowTruncSubSec(MTMVPartitionSyncTimeUnit.DAY, 4);
|
||||
// 2020-01-31
|
||||
Assert.assertEquals(1580400000L, nowTruncSubSec);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,11 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !select_base_table --
|
||||
1 bj 20380101
|
||||
2 sh 20380101
|
||||
3 bj 20200101
|
||||
4 sh 20200101
|
||||
|
||||
-- !mtmv_complete --
|
||||
1 20380101 bj
|
||||
2 20380101 sh
|
||||
|
||||
17
regression-test/data/mtmv_p0/test_limit_partition_mtmv.out
Normal file
17
regression-test/data/mtmv_p0/test_limit_partition_mtmv.out
Normal file
@ -0,0 +1,17 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !date_list --
|
||||
1 2038-01-01
|
||||
|
||||
-- !varchar_list --
|
||||
1 20380101
|
||||
|
||||
-- !varchar_list --
|
||||
1 20380101
|
||||
|
||||
-- !date_range --
|
||||
1 2038-01-02
|
||||
|
||||
-- !date_range_all --
|
||||
1 2038-01-02
|
||||
2 2020-01-02
|
||||
|
||||
@ -0,0 +1,162 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_hive_limit_partition_mtmv", "p0,external,hive,external_docker,external_docker_hive") {
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled == null || !enabled.equalsIgnoreCase("true")) {
|
||||
logger.info("diable Hive test.")
|
||||
return;
|
||||
}
|
||||
// prepare data in hive
|
||||
def hive_database = "test_hive_limit_partition_mtmv_db"
|
||||
def hive_table = "partition2"
|
||||
|
||||
def drop_table_str = """ drop table if exists ${hive_database}.${hive_table} """
|
||||
def drop_database_str = """ drop database if exists ${hive_database}"""
|
||||
def create_database_str = """ create database ${hive_database}"""
|
||||
def create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} (
|
||||
`k1` int)
|
||||
PARTITIONED BY (
|
||||
`region` string,
|
||||
`day` string
|
||||
)
|
||||
STORED AS ORC;
|
||||
"""
|
||||
def add_partition_str = """
|
||||
alter table ${hive_database}.${hive_table} add if not exists
|
||||
partition(region="bj",day="20380101")
|
||||
partition(region="sh",day="20380101")
|
||||
partition(region="bj",day="20200101")
|
||||
partition(region="sh",day="20200101")
|
||||
"""
|
||||
def insert_str1 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20380101") values(1)"""
|
||||
def insert_str2 = """insert into ${hive_database}.${hive_table} PARTITION(region="sh",day="20380101") values(2)"""
|
||||
def insert_str3 = """insert into ${hive_database}.${hive_table} PARTITION(region="bj",day="20200101") values(3)"""
|
||||
def insert_str4 = """insert into ${hive_database}.${hive_table} PARTITION(region="sh",day="20200101") values(4)"""
|
||||
|
||||
logger.info("hive sql: " + drop_table_str)
|
||||
hive_docker """ ${drop_table_str} """
|
||||
logger.info("hive sql: " + drop_database_str)
|
||||
hive_docker """ ${drop_database_str} """
|
||||
logger.info("hive sql: " + create_database_str)
|
||||
hive_docker """ ${create_database_str}"""
|
||||
logger.info("hive sql: " + create_table_str)
|
||||
hive_docker """ ${create_table_str} """
|
||||
logger.info("hive sql: " + add_partition_str)
|
||||
hive_docker """ ${add_partition_str} """
|
||||
logger.info("hive sql: " + insert_str1)
|
||||
hive_docker """ ${insert_str1} """
|
||||
logger.info("hive sql: " + insert_str2)
|
||||
hive_docker """ ${insert_str2} """
|
||||
logger.info("hive sql: " + insert_str3)
|
||||
hive_docker """ ${insert_str3} """
|
||||
logger.info("hive sql: " + insert_str4)
|
||||
hive_docker """ ${insert_str4} """
|
||||
|
||||
// prepare catalog
|
||||
String hms_port = context.config.otherConfigs.get("hms_port")
|
||||
String catalog_name = "test_hive_limit_partition_mtmv_catalog"
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
sql """create catalog if not exists ${catalog_name} properties (
|
||||
"type"="hms",
|
||||
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
|
||||
);"""
|
||||
|
||||
order_qt_select_base_table "SELECT * FROM ${catalog_name}.${hive_database}.${hive_table}"
|
||||
|
||||
|
||||
// string type
|
||||
def mvName = "test_hive_limit_partition_mtmv"
|
||||
def dbName = "regression_test_mtmv_p0"
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
sql """REFRESH catalog ${catalog_name}"""
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`day`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES (
|
||||
'replication_num' = '1',
|
||||
'partition_sync_limit'='2',
|
||||
'partition_sync_time_unit'='MONTH',
|
||||
'partition_date_format'='%Y%m%d'
|
||||
)
|
||||
AS
|
||||
SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table};
|
||||
"""
|
||||
def showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(1, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101"))
|
||||
|
||||
// refresh complete
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName} complete
|
||||
"""
|
||||
def jobName = getJobName(dbName, mvName);
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_mtmv_complete "SELECT * FROM ${mvName} order by k1,day,region"
|
||||
|
||||
|
||||
// date type
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
create_table_str = """ CREATE TABLE ${hive_database}.${hive_table} (
|
||||
`k1` int)
|
||||
PARTITIONED BY (
|
||||
`region` string,
|
||||
`day` date
|
||||
)
|
||||
STORED AS ORC;
|
||||
"""
|
||||
add_partition_str = """
|
||||
alter table ${hive_database}.${hive_table} add if not exists
|
||||
partition(region="bj",day="2038-01-01")
|
||||
partition(region="sh",day="2038-01-01")
|
||||
partition(region="bj",day="2020-01-01")
|
||||
partition(region="sh",day="2020-01-01")
|
||||
"""
|
||||
logger.info("hive sql: " + drop_table_str)
|
||||
hive_docker """ ${drop_table_str} """
|
||||
logger.info("hive sql: " + create_table_str)
|
||||
hive_docker """ ${create_table_str} """
|
||||
logger.info("hive sql: " + add_partition_str)
|
||||
hive_docker """ ${add_partition_str} """
|
||||
|
||||
sql """REFRESH catalog ${catalog_name}"""
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`day`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES (
|
||||
'replication_num' = '1',
|
||||
'partition_sync_limit'='2',
|
||||
'partition_sync_time_unit'='YEAR'
|
||||
)
|
||||
AS
|
||||
SELECT k1,day,region FROM ${catalog_name}.${hive_database}.${hive_table};
|
||||
"""
|
||||
showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(1, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101"))
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
}
|
||||
|
||||
240
regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy
Normal file
240
regression-test/suites/mtmv_p0/test_limit_partition_mtmv.groovy
Normal file
@ -0,0 +1,240 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
suite("test_limit_partition_mtmv") {
|
||||
def tableName = "t_test_limit_partition_mtmv_user"
|
||||
def mvName = "multi_mv_test_limit_partition_mtmv"
|
||||
def dbName = "regression_test_mtmv_p0"
|
||||
|
||||
// list partition date type
|
||||
sql """drop table if exists `${tableName}`"""
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
sql """
|
||||
CREATE TABLE `${tableName}` (
|
||||
`k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
|
||||
`k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"'
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
COMMENT 'OLAP'
|
||||
PARTITION BY list(`k2`)
|
||||
(
|
||||
PARTITION p_20380101 VALUES IN ("2038-01-01"),
|
||||
PARTITION p_20200101 VALUES IN ("2020-01-01")
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 2
|
||||
PROPERTIES ('replication_num' = '1') ;
|
||||
"""
|
||||
sql """
|
||||
insert into ${tableName} values(1,"2038-01-01"),(2,"2020-01-01");
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`k2`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES (
|
||||
'replication_num' = '1',
|
||||
'partition_sync_limit'='2',
|
||||
'partition_sync_time_unit'='YEAR'
|
||||
)
|
||||
AS
|
||||
SELECT * FROM ${tableName};
|
||||
"""
|
||||
showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(1, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101"))
|
||||
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
def jobName = getJobName(dbName, mvName);
|
||||
log.info(jobName)
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_date_list "SELECT * FROM ${mvName} order by k1,k2"
|
||||
|
||||
|
||||
|
||||
// list partition string type
|
||||
sql """drop table if exists `${tableName}`"""
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
sql """
|
||||
CREATE TABLE `${tableName}` (
|
||||
`k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
|
||||
`k2` VARCHAR(100) NOT NULL COMMENT '\"数据灌入日期时间\"'
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
COMMENT 'OLAP'
|
||||
PARTITION BY list(`k2`)
|
||||
(
|
||||
PARTITION p_20380101 VALUES IN ("20380101"),
|
||||
PARTITION p_20200101 VALUES IN ("20200101")
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 2
|
||||
PROPERTIES ('replication_num' = '1') ;
|
||||
"""
|
||||
sql """
|
||||
insert into ${tableName} values(1,"20380101"),(2,"20200101");
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`k2`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES (
|
||||
'replication_num' = '1',
|
||||
'partition_sync_limit'='2',
|
||||
'partition_sync_time_unit'='MONTH',
|
||||
'partition_date_format'='%Y%m%d'
|
||||
)
|
||||
AS
|
||||
SELECT * FROM ${tableName};
|
||||
"""
|
||||
showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(1, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101"))
|
||||
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
jobName = getJobName(dbName, mvName);
|
||||
log.info(jobName)
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_varchar_list "SELECT * FROM ${mvName} order by k1,k2"
|
||||
|
||||
|
||||
// list partition int type
|
||||
sql """drop table if exists `${tableName}`"""
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
sql """
|
||||
CREATE TABLE `${tableName}` (
|
||||
`k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
|
||||
`k2` int NOT NULL COMMENT '\"数据灌入日期时间\"'
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
COMMENT 'OLAP'
|
||||
PARTITION BY list(`k2`)
|
||||
(
|
||||
PARTITION p_20380101 VALUES IN ("20380101"),
|
||||
PARTITION p_20200101 VALUES IN ("20200101")
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 2
|
||||
PROPERTIES ('replication_num' = '1') ;
|
||||
"""
|
||||
sql """
|
||||
insert into ${tableName} values(1,20380101),(2,20200101);
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`k2`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES (
|
||||
'replication_num' = '1',
|
||||
'partition_sync_limit'='2',
|
||||
'partition_sync_time_unit'='DAY',
|
||||
'partition_date_format'='%Y%m%d'
|
||||
)
|
||||
AS
|
||||
SELECT * FROM ${tableName};
|
||||
"""
|
||||
showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(1, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101"))
|
||||
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
jobName = getJobName(dbName, mvName);
|
||||
log.info(jobName)
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_varchar_list "SELECT * FROM ${mvName} order by k1,k2"
|
||||
|
||||
|
||||
// range partition date type
|
||||
sql """drop table if exists `${tableName}`"""
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
sql """
|
||||
CREATE TABLE `${tableName}` (
|
||||
`k1` LARGEINT NOT NULL COMMENT '\"用户id\"',
|
||||
`k2` DATE NOT NULL COMMENT '\"数据灌入日期时间\"'
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
COMMENT 'OLAP'
|
||||
PARTITION BY range(`k2`)
|
||||
(
|
||||
PARTITION p2038 VALUES [("2038-01-01"),("2038-01-03")),
|
||||
PARTITION p2020 VALUES [("2020-01-01"),("2020-01-03"))
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 2
|
||||
PROPERTIES ('replication_num' = '1') ;
|
||||
"""
|
||||
sql """
|
||||
insert into ${tableName} values(1,"2038-01-02"),(2,"2020-01-02");
|
||||
"""
|
||||
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`k2`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES (
|
||||
'replication_num' = '1',
|
||||
'partition_sync_limit'='2',
|
||||
'partition_sync_time_unit'='YEAR'
|
||||
)
|
||||
AS
|
||||
SELECT * FROM ${tableName};
|
||||
"""
|
||||
showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(1, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101_20380103"))
|
||||
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
jobName = getJobName(dbName, mvName);
|
||||
log.info(jobName)
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_date_range "SELECT * FROM ${mvName} order by k1,k2"
|
||||
|
||||
|
||||
// alter
|
||||
sql """
|
||||
alter Materialized View ${mvName} set("partition_sync_limit"="");
|
||||
"""
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
jobName = getJobName(dbName, mvName);
|
||||
log.info(jobName)
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertEquals(2, showPartitionsResult.size())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20380101_20380103"))
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20200101_20200103"))
|
||||
order_qt_date_range_all "SELECT * FROM ${mvName} order by k1,k2"
|
||||
}
|
||||
Reference in New Issue
Block a user