[feature](mtmv)create mtmv support refresh_partition_num (#28566)
- create/alter mtmv support refresh_partition_num - mtmv task according to refresh_partition_num executes refresh tasks in batches - `tasks` tvf add column `CompletedPartitions` and `progress` - fix mtmv can not `show temp partition` and `drop temp partition` - fix task can not get error msg when insert overwrite error - fix when the partition field is capitalized, the verification of creating a mtmv does not pass
This commit is contained in:
@ -185,6 +185,15 @@ public class MTMV extends OlapTable {
|
||||
}
|
||||
}
|
||||
|
||||
public int getRefreshPartitionNum() {
|
||||
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
|
||||
int value = Integer.parseInt(mvProperties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
|
||||
return value < 1 ? MTMVTask.DEFAULT_REFRESH_PARTITION_NUM : value;
|
||||
} else {
|
||||
return MTMVTask.DEFAULT_REFRESH_PARTITION_NUM;
|
||||
}
|
||||
}
|
||||
|
||||
public Set<String> getExcludedTriggerTables() {
|
||||
if (!mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
|
||||
return Sets.newHashSet();
|
||||
|
||||
@ -86,7 +86,7 @@ public class TableProcDir implements ProcDirInterface {
|
||||
throw new AnalysisException("Table[" + table.getName() + "] is not a OLAP or ELASTICSEARCH table");
|
||||
}
|
||||
} else if (entryName.equals(TEMP_PARTITIONS)) {
|
||||
if (table.getType() == TableType.OLAP) {
|
||||
if (table instanceof OlapTable) {
|
||||
return new PartitionsProcDir((Database) db, (OlapTable) table, true);
|
||||
} else {
|
||||
throw new AnalysisException("Table[" + table.getName() + "] does not have temp partitions");
|
||||
|
||||
@ -159,6 +159,7 @@ public class PropertyAnalyzer {
|
||||
"enable_duplicate_without_keys_by_default";
|
||||
public static final String PROPERTIES_GRACE_PERIOD = "grace_period";
|
||||
public static final String PROPERTIES_EXCLUDED_TRIGGER_TABLES = "excluded_trigger_tables";
|
||||
public static final String PROPERTIES_REFRESH_PARTITION_NUM = "refresh_partition_num";
|
||||
// For unique key data model, the feature Merge-on-Write will leverage a primary
|
||||
// key index and a delete-bitmap to mark duplicate keys as deleted in load stage,
|
||||
// which can avoid the merging cost in read stage, and accelerate the aggregation
|
||||
|
||||
@ -1707,7 +1707,8 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
}
|
||||
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST) {
|
||||
if (partitionInfo.getType() != PartitionType.RANGE && partitionInfo.getType() != PartitionType.LIST
|
||||
&& !isTempPartition) {
|
||||
throw new DdlException("Alter table [" + olapTable.getName() + "] failed. Not a partitioned table");
|
||||
}
|
||||
|
||||
|
||||
@ -39,6 +39,7 @@ import org.apache.doris.nereids.glue.LogicalPlanAdapter;
|
||||
import org.apache.doris.nereids.trees.plans.commands.UpdateMvByPartitionCommand;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.QueryState.MysqlStateType;
|
||||
import org.apache.doris.qe.StmtExecutor;
|
||||
import org.apache.doris.thrift.TCell;
|
||||
import org.apache.doris.thrift.TRow;
|
||||
@ -46,6 +47,7 @@ import org.apache.doris.thrift.TUniqueId;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.gson.Gson;
|
||||
@ -54,6 +56,8 @@ import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.math.RoundingMode;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
@ -62,7 +66,7 @@ import java.util.UUID;
|
||||
|
||||
public class MTMVTask extends AbstractTask {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMVTask.class);
|
||||
public static final Long MAX_HISTORY_TASKS_NUM = 100L;
|
||||
public static final int DEFAULT_REFRESH_PARTITION_NUM = 1;
|
||||
|
||||
public static final ImmutableList<Column> SCHEMA = ImmutableList.of(
|
||||
new Column("TaskId", ScalarType.createStringType()),
|
||||
@ -78,7 +82,9 @@ public class MTMVTask extends AbstractTask {
|
||||
new Column("DurationMs", ScalarType.createStringType()),
|
||||
new Column("TaskContext", ScalarType.createStringType()),
|
||||
new Column("RefreshMode", ScalarType.createStringType()),
|
||||
new Column("RefreshPartitions", ScalarType.createStringType()));
|
||||
new Column("NeedRefreshPartitions", ScalarType.createStringType()),
|
||||
new Column("CompletedPartitions", ScalarType.createStringType()),
|
||||
new Column("Progress", ScalarType.createStringType()));
|
||||
|
||||
public static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
|
||||
|
||||
@ -97,7 +103,7 @@ public class MTMVTask extends AbstractTask {
|
||||
|
||||
public enum MTMVTaskRefreshMode {
|
||||
COMPLETE,
|
||||
PARTITION,
|
||||
PARTIAL,
|
||||
NOT_REFRESH
|
||||
}
|
||||
|
||||
@ -107,15 +113,16 @@ public class MTMVTask extends AbstractTask {
|
||||
private long mtmvId;
|
||||
@SerializedName("taskContext")
|
||||
private MTMVTaskContext taskContext;
|
||||
@SerializedName("refreshPartitions")
|
||||
List<String> refreshPartitions;
|
||||
@SerializedName("needRefreshPartitions")
|
||||
List<String> needRefreshPartitions;
|
||||
@SerializedName("completedPartitions")
|
||||
List<String> completedPartitions;
|
||||
@SerializedName("refreshMode")
|
||||
MTMVTaskRefreshMode refreshMode;
|
||||
|
||||
private MTMV mtmv;
|
||||
private MTMVRelation relation;
|
||||
private StmtExecutor executor;
|
||||
private Set<Long> refreshPartitionIds = Sets.newHashSet();
|
||||
|
||||
public MTMVTask() {
|
||||
}
|
||||
@ -130,30 +137,51 @@ public class MTMVTask extends AbstractTask {
|
||||
public void run() throws JobException {
|
||||
try {
|
||||
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
|
||||
TUniqueId queryId = generateQueryId();
|
||||
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
|
||||
// such as deleting a table and creating a view with the same name
|
||||
relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
|
||||
calculateRefreshInfo();
|
||||
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
|
||||
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
|
||||
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions();
|
||||
this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
|
||||
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
|
||||
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
|
||||
return;
|
||||
} else if (refreshMode == MTMVTaskRefreshMode.PARTITION) {
|
||||
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
|
||||
}
|
||||
refreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, refreshPartitionIds);
|
||||
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
|
||||
.from(mtmv, refreshPartitionIds, tableWithPartKey);
|
||||
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
|
||||
ctx.setQueryId(queryId);
|
||||
command.run(ctx, executor);
|
||||
Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
|
||||
this.completedPartitions = Lists.newArrayList();
|
||||
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
|
||||
long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size()
|
||||
% refreshPartitionNum) > 0 ? 1 : 0);
|
||||
for (int i = 0; i < execNum; i++) {
|
||||
int start = i * refreshPartitionNum;
|
||||
int end = start + refreshPartitionNum;
|
||||
Set<Long> execPartitionIds = Sets.newHashSet(needRefreshPartitionIds
|
||||
.subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end));
|
||||
// need get names before exec
|
||||
List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
|
||||
exec(ctx, execPartitionIds, tableWithPartKey);
|
||||
completedPartitions.addAll(execPartitionNames);
|
||||
}
|
||||
} catch (Throwable e) {
|
||||
LOG.warn("run task failed: ", e);
|
||||
throw new JobException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey)
|
||||
throws Exception {
|
||||
TUniqueId queryId = generateQueryId();
|
||||
// if SELF_MANAGE mv, only have default partition, will not have partitionItem, so we give empty set
|
||||
UpdateMvByPartitionCommand command = UpdateMvByPartitionCommand
|
||||
.from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE
|
||||
? refreshPartitionIds : Sets.newHashSet(), tableWithPartKey);
|
||||
executor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, ctx.getStatementContext()));
|
||||
ctx.setQueryId(queryId);
|
||||
command.run(ctx, executor);
|
||||
if (ctx.getState().getStateType() != MysqlStateType.OK) {
|
||||
throw new JobException(ctx.getState().getErrorMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void onFail() throws JobException {
|
||||
super.onFail();
|
||||
@ -214,10 +242,33 @@ public class MTMVTask extends AbstractTask {
|
||||
new TCell().setStringVal(refreshMode == null ? FeConstants.null_string : refreshMode.toString()));
|
||||
trow.addToColumnValue(
|
||||
new TCell().setStringVal(
|
||||
refreshPartitions == null ? FeConstants.null_string : new Gson().toJson(refreshPartitions)));
|
||||
needRefreshPartitions == null ? FeConstants.null_string : new Gson().toJson(
|
||||
needRefreshPartitions)));
|
||||
trow.addToColumnValue(
|
||||
new TCell().setStringVal(
|
||||
completedPartitions == null ? FeConstants.null_string : new Gson().toJson(
|
||||
completedPartitions)));
|
||||
trow.addToColumnValue(
|
||||
new TCell().setStringVal(getProgress()));
|
||||
return trow;
|
||||
}
|
||||
|
||||
private String getProgress() {
|
||||
if (CollectionUtils.isEmpty(needRefreshPartitions)) {
|
||||
return FeConstants.null_string;
|
||||
}
|
||||
int completedSize = CollectionUtils.isEmpty(completedPartitions) ? 0 : completedPartitions.size();
|
||||
BigDecimal result = new BigDecimal(completedSize * 100)
|
||||
.divide(new BigDecimal(needRefreshPartitions.size()), 2, RoundingMode.HALF_UP);
|
||||
StringBuilder builder = new StringBuilder(result.toString());
|
||||
builder.append("% (");
|
||||
builder.append(completedSize);
|
||||
builder.append("/");
|
||||
builder.append(needRefreshPartitions.size());
|
||||
builder.append(")");
|
||||
return builder.toString();
|
||||
}
|
||||
|
||||
private TUniqueId generateQueryId() {
|
||||
UUID taskId = UUID.randomUUID();
|
||||
return new TUniqueId(taskId.getMostSignificantBits(), taskId.getLeastSignificantBits());
|
||||
@ -229,52 +280,53 @@ public class MTMVTask extends AbstractTask {
|
||||
mtmv = null;
|
||||
relation = null;
|
||||
executor = null;
|
||||
refreshPartitionIds = null;
|
||||
}
|
||||
|
||||
private void calculateRefreshInfo() throws AnalysisException {
|
||||
private Map<OlapTable, String> getIncrementalTableMap() throws AnalysisException {
|
||||
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
|
||||
}
|
||||
return tableWithPartKey;
|
||||
}
|
||||
|
||||
|
||||
private MTMVTaskRefreshMode generateRefreshMode(List<Long> needRefreshPartitionIds) {
|
||||
if (CollectionUtils.isEmpty(needRefreshPartitionIds)) {
|
||||
return MTMVTaskRefreshMode.NOT_REFRESH;
|
||||
} else if (needRefreshPartitionIds.size() == mtmv.getPartitionIds().size()) {
|
||||
return MTMVTaskRefreshMode.COMPLETE;
|
||||
} else {
|
||||
return MTMVTaskRefreshMode.PARTIAL;
|
||||
}
|
||||
}
|
||||
|
||||
private List<Long> calculateNeedRefreshPartitions() throws AnalysisException {
|
||||
// check whether the user manually triggers it
|
||||
if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
|
||||
if (taskContext.isComplete()) {
|
||||
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
|
||||
return;
|
||||
return mtmv.getPartitionIds();
|
||||
} else if (!CollectionUtils
|
||||
.isEmpty(taskContext.getPartitions())) {
|
||||
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
|
||||
this.refreshPartitionIds = MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
|
||||
return;
|
||||
return MTMVUtil.getPartitionsIdsByNames(mtmv, taskContext.getPartitions());
|
||||
}
|
||||
}
|
||||
// check if data is fresh
|
||||
Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
|
||||
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L);
|
||||
if (fresh) {
|
||||
this.refreshMode = MTMVTaskRefreshMode.NOT_REFRESH;
|
||||
return;
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
// current, if partitionType is SELF_MANAGE, we can only FULL refresh
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
|
||||
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
|
||||
return;
|
||||
return mtmv.getPartitionIds();
|
||||
}
|
||||
// if refreshMethod is COMPLETE, we only FULL refresh
|
||||
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) {
|
||||
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
|
||||
return;
|
||||
}
|
||||
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
excludedTriggerTables.add(relatedTable.getName());
|
||||
// check if every table except relatedTable is fresh
|
||||
Set<Long> mtmvNeedRefreshPartitions = MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
|
||||
// if true, we can use `Partition`, otherwise must `FULL`
|
||||
if (mtmvNeedRefreshPartitions.size() != mtmv.getPartitionNum()) {
|
||||
this.refreshMode = MTMVTaskRefreshMode.PARTITION;
|
||||
this.refreshPartitionIds = mtmvNeedRefreshPartitions;
|
||||
return;
|
||||
} else {
|
||||
this.refreshMode = MTMVTaskRefreshMode.COMPLETE;
|
||||
return;
|
||||
return mtmv.getPartitionIds();
|
||||
}
|
||||
return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
|
||||
}
|
||||
|
||||
public MTMVTaskContext getTaskContext() {
|
||||
|
||||
@ -172,7 +172,7 @@ public class MTMVUtil {
|
||||
return ids;
|
||||
}
|
||||
|
||||
public static List<String> getPartitionNamesByIds(MTMV mtmv, Set<Long> ids) throws AnalysisException {
|
||||
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
|
||||
List<String> res = Lists.newArrayList();
|
||||
for (Long partitionId : ids) {
|
||||
res.add(mtmv.getPartitionOrAnalysisException(partitionId).getName());
|
||||
@ -180,8 +180,8 @@ public class MTMVUtil {
|
||||
return res;
|
||||
}
|
||||
|
||||
public static Set<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
|
||||
Set<Long> res = Sets.newHashSet();
|
||||
public static List<Long> getPartitionsIdsByNames(MTMV mtmv, List<String> partitions) throws AnalysisException {
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (String partitionName : partitions) {
|
||||
Partition partition = mtmv.getPartitionOrAnalysisException(partitionName);
|
||||
res.add(partition.getId());
|
||||
@ -286,9 +286,9 @@ public class MTMVUtil {
|
||||
return res;
|
||||
}
|
||||
|
||||
public static Set<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
|
||||
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
|
||||
Collection<Partition> allPartitions = mtmv.getPartitions();
|
||||
Set<Long> res = Sets.newHashSet();
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (Partition partition : allPartitions) {
|
||||
try {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(),
|
||||
|
||||
@ -60,6 +60,17 @@ public class AlterMTMVPropertyInfo extends AlterMTMVInfo {
|
||||
throw new org.apache.doris.nereids.exceptions.AnalysisException(
|
||||
"valid grace_period: " + properties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
|
||||
}
|
||||
} else if (PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM.equals(key)) {
|
||||
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
|
||||
try {
|
||||
Integer.parseInt(refreshPartitionNum);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException(
|
||||
"valid refresh_partition_num: " + properties
|
||||
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
|
||||
}
|
||||
} else if (PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES.equals(key)) {
|
||||
// nothing
|
||||
} else {
|
||||
throw new org.apache.doris.nereids.exceptions.AnalysisException("illegal key:" + key);
|
||||
}
|
||||
|
||||
@ -174,6 +174,18 @@ public class CreateMTMVInfo {
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD, gracePeriod);
|
||||
properties.remove(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD);
|
||||
}
|
||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM)) {
|
||||
String refreshPartitionNum = properties.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
|
||||
try {
|
||||
Integer.parseInt(refreshPartitionNum);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new AnalysisException(
|
||||
"valid refresh_partition_num: " + properties
|
||||
.get(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM));
|
||||
}
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM, refreshPartitionNum);
|
||||
properties.remove(PropertyAnalyzer.PROPERTIES_REFRESH_PARTITION_NUM);
|
||||
}
|
||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES)) {
|
||||
String excludedTriggerTables = properties.get(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES);
|
||||
mvProperties.put(PropertyAnalyzer.PROPERTIES_EXCLUDED_TRIGGER_TABLES, excludedTriggerTables);
|
||||
@ -228,9 +240,9 @@ public class CreateMTMVInfo {
|
||||
if (!(followTable instanceof OlapTable)) {
|
||||
throw new AnalysisException("base table for partitioning only can be OlapTable.");
|
||||
}
|
||||
Set<String> partitionColumnNames;
|
||||
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
try {
|
||||
partitionColumnNames = ((OlapTable) followTable).getPartitionColumnNames();
|
||||
partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames());
|
||||
} catch (DdlException e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user