[improvement](mtmv)mtmv support partition by hms table (#29989)
This commit is contained in:
@ -678,6 +678,22 @@ update orc_full_acid_par set value = 'BB' where id = 2;
|
||||
alter table orc_full_acid_par PARTITION(part_col=20230101) compact 'major';
|
||||
alter table orc_full_acid_par PARTITION(part_col=20230102) compact 'major';
|
||||
|
||||
create table mtmv_base1 (id INT, value STRING)
|
||||
PARTITIONED BY (part_col INT)
|
||||
CLUSTERED BY (id) INTO 3 BUCKETS
|
||||
STORED AS ORC;
|
||||
|
||||
insert into mtmv_base1 PARTITION(part_col=20230101) values
|
||||
(1, 'A'),
|
||||
(2, 'B'),
|
||||
(3, 'C');
|
||||
|
||||
insert into mtmv_base1 PARTITION(part_col=20230102) values
|
||||
(4, 'D'),
|
||||
(5, 'E'),
|
||||
(6, 'F');
|
||||
|
||||
|
||||
CREATE TABLE `test_different_column_orders_orc`(
|
||||
`name` string,
|
||||
`id` int,
|
||||
|
||||
@ -17,6 +17,9 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.PartitionValue;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.DataInput;
|
||||
@ -24,6 +27,7 @@ import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ListPartitionItem extends PartitionItem {
|
||||
public static ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList());
|
||||
@ -69,6 +73,13 @@ public class ListPartitionItem extends PartitionItem {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionKeyDesc toPartitionKeyDesc() {
|
||||
List<List<PartitionValue>> inValues = partitionKeys.stream().map(PartitionInfo::toPartitionValue)
|
||||
.collect(Collectors.toList());
|
||||
return PartitionKeyDesc.createIn(inValues);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeInt(partitionKeys.size());
|
||||
|
||||
@ -46,6 +46,7 @@ import org.apache.doris.common.io.DeepCopy;
|
||||
import org.apache.doris.common.io.Text;
|
||||
import org.apache.doris.common.util.PropertyAnalyzer;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
import org.apache.doris.qe.OriginStatement;
|
||||
import org.apache.doris.resource.Tag;
|
||||
@ -99,7 +100,7 @@ import java.util.stream.Collectors;
|
||||
* Internal representation of tableFamilyGroup-related metadata. A OlaptableFamilyGroup contains several tableFamily.
|
||||
* Note: when you add a new olap table property, you should modify TableProperty class
|
||||
*/
|
||||
public class OlapTable extends Table {
|
||||
public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
private static final Logger LOG = LogManager.getLogger(OlapTable.class);
|
||||
|
||||
public enum OlapTableState {
|
||||
@ -763,6 +764,7 @@ public class OlapTable extends Table {
|
||||
return partitionInfo;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getPartitionColumnNames() throws DdlException {
|
||||
Set<String> partitionColumnNames = Sets.newHashSet();
|
||||
if (partitionInfo instanceof SinglePartitionInfo) {
|
||||
@ -2499,4 +2501,37 @@ public class OlapTable extends Table {
|
||||
}
|
||||
return tablets;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionType getPartitionType() {
|
||||
return partitionInfo.getType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Long, PartitionItem> getPartitionItems() {
|
||||
return getPartitionInfo().getIdToItem(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException {
|
||||
return getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastModifyTime() {
|
||||
long result = 0L;
|
||||
long visibleVersionTime;
|
||||
for (Partition partition : getAllPartitions()) {
|
||||
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
|
||||
if (visibleVersionTime > result) {
|
||||
result = visibleVersionTime;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Column> getPartitionColumns() {
|
||||
return getPartitionInfo().getPartitionColumns();
|
||||
}
|
||||
}
|
||||
|
||||
@ -366,7 +366,7 @@ public class PartitionInfo implements Writable {
|
||||
throw new RuntimeException("Should implement it in derived classes.");
|
||||
}
|
||||
|
||||
static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
|
||||
public static List<PartitionValue> toPartitionValue(PartitionKey partitionKey) {
|
||||
return partitionKey.getKeys().stream().map(expr -> {
|
||||
if (expr == MaxLiteral.MAX_VALUE) {
|
||||
return PartitionValue.MAX_VALUE;
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.common.io.Writable;
|
||||
|
||||
import java.util.Comparator;
|
||||
@ -34,4 +35,5 @@ public abstract class PartitionItem implements Comparable<PartitionItem>, Writab
|
||||
return false;
|
||||
}
|
||||
|
||||
public abstract PartitionKeyDesc toPartitionKeyDesc();
|
||||
}
|
||||
|
||||
@ -142,6 +142,10 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable {
|
||||
partitionKey.originHiveKeys.add(values.get(i).getStringValue());
|
||||
}
|
||||
partitionKey.types.add(types.get(i).getPrimitiveType());
|
||||
//If there is one default value, set `isDefaultListPartitionKey` to true
|
||||
if (values.get(i).isHiveDefaultPartition()) {
|
||||
partitionKey.setDefaultListPartition(true);
|
||||
}
|
||||
}
|
||||
if (values.isEmpty()) {
|
||||
for (int i = 0; i < types.size(); ++i) {
|
||||
|
||||
@ -17,6 +17,7 @@
|
||||
|
||||
package org.apache.doris.catalog;
|
||||
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.common.util.RangeUtils;
|
||||
|
||||
import com.google.common.collect.Range;
|
||||
@ -45,6 +46,13 @@ public class RangePartitionItem extends PartitionItem {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionKeyDesc toPartitionKeyDesc() {
|
||||
return PartitionKeyDesc.createFixed(
|
||||
PartitionInfo.toPartitionValue(partitionKeyRange.lowerEndpoint()),
|
||||
PartitionInfo.toPartitionValue(partitionKeyRange.upperEndpoint()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
RangeUtils.writeRange(out, partitionKeyRange);
|
||||
|
||||
@ -21,6 +21,9 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.HiveMetaStoreClientHelper;
|
||||
import org.apache.doris.catalog.HudiUtils;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
@ -28,6 +31,8 @@ import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.datasource.HMSExternalCatalog;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
|
||||
import org.apache.doris.datasource.hive.HivePartition;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.nereids.exceptions.NotSupportedException;
|
||||
import org.apache.doris.statistics.AnalysisInfo;
|
||||
import org.apache.doris.statistics.BaseAnalysisTask;
|
||||
@ -71,6 +76,7 @@ import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
@ -78,7 +84,7 @@ import java.util.stream.Collectors;
|
||||
/**
|
||||
* Hive metastore external table.
|
||||
*/
|
||||
public class HMSExternalTable extends ExternalTable {
|
||||
public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class);
|
||||
|
||||
private static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
|
||||
@ -257,6 +263,7 @@ public class HMSExternalTable extends ExternalTable {
|
||||
return partitionColumns.stream().map(c -> c.getType()).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Column> getPartitionColumns() {
|
||||
makeSureInitialized();
|
||||
getFullSchema();
|
||||
@ -778,6 +785,58 @@ public class HMSExternalTable extends ExternalTable {
|
||||
return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public PartitionType getPartitionType() {
|
||||
return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getPartitionColumnNames() {
|
||||
return getPartitionColumns().stream()
|
||||
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<Long, PartitionItem> getPartitionItems() {
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
|
||||
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
|
||||
getDbName(), getName(), getPartitionColumnTypes());
|
||||
|
||||
return hivePartitionValues.getIdToPartitionItem().entrySet().stream()
|
||||
.filter(entry -> !entry.getValue().isDefaultPartition())
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException {
|
||||
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1);
|
||||
partitionValuesList.add(
|
||||
((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
|
||||
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
|
||||
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
|
||||
List<HivePartition> resPartitions = cache.getAllPartitionsWithCache(getDbName(), getName(),
|
||||
partitionValuesList);
|
||||
if (resPartitions.size() != 1) {
|
||||
throw new AnalysisException("partition not normal, size: " + resPartitions.size());
|
||||
}
|
||||
return resPartitions.get(0).getLastModifiedTimeIgnoreInit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastModifyTime() throws AnalysisException {
|
||||
|
||||
long result = 0L;
|
||||
long visibleVersionTime;
|
||||
for (Entry<Long, PartitionItem> entry : getPartitionItems().entrySet()) {
|
||||
visibleVersionTime = getPartitionLastModifyTime(entry.getKey(), entry.getValue());
|
||||
if (visibleVersionTime > result) {
|
||||
result = visibleVersionTime;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@ -300,7 +300,10 @@ public class HiveMetaStoreCache {
|
||||
}
|
||||
try {
|
||||
PartitionKey key = PartitionKey.createListPartitionKeyWithTypes(values, types, true);
|
||||
return new ListPartitionItem(Lists.newArrayList(key));
|
||||
ListPartitionItem listPartitionItem = new ListPartitionItem(Lists.newArrayList(key));
|
||||
// if `PartitionKey` is default, set `PartitionItem` to default
|
||||
listPartitionItem.setDefaultPartition(key.isHiveDefaultPartition());
|
||||
return listPartitionItem;
|
||||
} catch (AnalysisException e) {
|
||||
throw new CacheException("failed to convert hive partition %s to list partition in catalog %s",
|
||||
e, partitionName, catalog.getName());
|
||||
@ -315,7 +318,8 @@ public class HiveMetaStoreCache {
|
||||
sd.getInputFormat(), sd.getLocation(), key, catalog.getName());
|
||||
}
|
||||
// TODO: more info?
|
||||
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values);
|
||||
return new HivePartition(key.dbName, key.tblName, false, sd.getInputFormat(), sd.getLocation(), key.values,
|
||||
partition.getParameters());
|
||||
}
|
||||
|
||||
private Map<PartitionCacheKey, HivePartition> loadPartitions(Iterable<? extends PartitionCacheKey> keys) {
|
||||
@ -348,7 +352,7 @@ public class HiveMetaStoreCache {
|
||||
StorageDescriptor sd = partition.getSd();
|
||||
ret.put(new PartitionCacheKey(dbName, tblName, partition.getValues()),
|
||||
new HivePartition(dbName, tblName, false,
|
||||
sd.getInputFormat(), sd.getLocation(), partition.getValues()));
|
||||
sd.getInputFormat(), sd.getLocation(), partition.getValues(), partition.getParameters()));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -23,18 +23,23 @@ import com.google.common.base.Preconditions;
|
||||
import lombok.Data;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
public class HivePartition {
|
||||
public static final String LAST_MODIFY_TIME_KEY = "transient_lastDdlTime";
|
||||
public static final String FILE_NUM_KEY = "numFiles";
|
||||
|
||||
private String dbName;
|
||||
private String tblName;
|
||||
private String inputFormat;
|
||||
private String path;
|
||||
private List<String> partitionValues;
|
||||
private boolean isDummyPartition;
|
||||
private Map<String, String> parameters;
|
||||
|
||||
public HivePartition(String dbName, String tblName, boolean isDummyPartition,
|
||||
String inputFormat, String path, List<String> partitionValues) {
|
||||
String inputFormat, String path, List<String> partitionValues, Map<String, String> parameters) {
|
||||
this.dbName = dbName;
|
||||
this.tblName = tblName;
|
||||
this.isDummyPartition = isDummyPartition;
|
||||
@ -44,6 +49,7 @@ public class HivePartition {
|
||||
this.path = path;
|
||||
// eg: cn, beijing
|
||||
this.partitionValues = partitionValues;
|
||||
this.parameters = parameters;
|
||||
}
|
||||
|
||||
// return partition name like: nation=cn/city=beijing
|
||||
@ -63,6 +69,31 @@ public class HivePartition {
|
||||
return this.isDummyPartition;
|
||||
}
|
||||
|
||||
public long getLastModifiedTime() {
|
||||
if (parameters == null || !parameters.containsKey(LAST_MODIFY_TIME_KEY)) {
|
||||
return 0L;
|
||||
}
|
||||
return Long.parseLong(parameters.get(LAST_MODIFY_TIME_KEY)) * 1000;
|
||||
}
|
||||
|
||||
/**
|
||||
* If there are no files, it proves that there is no data under the partition, we return 0
|
||||
* @return
|
||||
*/
|
||||
public long getLastModifiedTimeIgnoreInit() {
|
||||
if (getFileNum() == 0) {
|
||||
return 0L;
|
||||
}
|
||||
return getLastModifiedTime();
|
||||
}
|
||||
|
||||
public long getFileNum() {
|
||||
if (parameters == null || !parameters.containsKey(FILE_NUM_KEY)) {
|
||||
return 0L;
|
||||
}
|
||||
return Long.parseLong(parameters.get(FILE_NUM_KEY));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "HivePartition{"
|
||||
|
||||
@ -21,9 +21,10 @@ import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
import org.apache.doris.catalog.external.HMSExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.FeConstants;
|
||||
@ -33,6 +34,7 @@ import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.task.AbstractTask;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVPlanUtil;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
|
||||
@ -149,13 +151,18 @@ public class MTMVTask extends AbstractTask {
|
||||
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
|
||||
// such as deleting a table and creating a view with the same name
|
||||
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
|
||||
// Before obtaining information from hmsTable, refresh to ensure that the data is up-to-date
|
||||
refreshHmsTable();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
}
|
||||
List<Long> needRefreshPartitionIds = calculateNeedRefreshPartitions();
|
||||
this.needRefreshPartitions = MTMVUtil.getPartitionNamesByIds(mtmv, needRefreshPartitionIds);
|
||||
this.refreshMode = generateRefreshMode(needRefreshPartitionIds);
|
||||
if (refreshMode == MTMVTaskRefreshMode.NOT_REFRESH) {
|
||||
return;
|
||||
}
|
||||
Map<OlapTable, String> tableWithPartKey = getIncrementalTableMap();
|
||||
Map<TableIf, String> tableWithPartKey = getIncrementalTableMap();
|
||||
this.completedPartitions = Lists.newArrayList();
|
||||
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
|
||||
long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size()
|
||||
@ -176,7 +183,8 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
}
|
||||
|
||||
private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds, Map<OlapTable, String> tableWithPartKey)
|
||||
private void exec(ConnectContext ctx, Set<Long> refreshPartitionIds,
|
||||
Map<TableIf, String> tableWithPartKey)
|
||||
throws Exception {
|
||||
TUniqueId queryId = generateQueryId();
|
||||
lastQueryId = DebugUtil.printId(queryId);
|
||||
@ -223,16 +231,25 @@ public class MTMVTask extends AbstractTask {
|
||||
super.before();
|
||||
try {
|
||||
mtmv = getMTMV();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
MTMVUtil.alignMvPartition(mtmv, relatedTable);
|
||||
}
|
||||
} catch (UserException e) {
|
||||
LOG.warn("before task failed:", e);
|
||||
throw new JobException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshHmsTable() throws AnalysisException, DdlException {
|
||||
for (BaseTableInfo tableInfo : relation.getBaseTables()) {
|
||||
TableIf tableIf = MTMVUtil.getTable(tableInfo);
|
||||
if (tableIf instanceof HMSExternalTable) {
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) tableIf;
|
||||
Env.getCurrentEnv().getCatalogMgr()
|
||||
.refreshExternalTable(hmsTable.getDbName(), hmsTable.getName(), hmsTable.getCatalog().getName(),
|
||||
true);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private MTMV getMTMV() throws DdlException, MetaNotFoundException {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
|
||||
return (MTMV) db.getTableOrMetaException(mtmvId, TableType.MATERIALIZED_VIEW);
|
||||
@ -331,16 +348,15 @@ public class MTMVTask extends AbstractTask {
|
||||
executor = null;
|
||||
}
|
||||
|
||||
private Map<OlapTable, String> getIncrementalTableMap() throws AnalysisException {
|
||||
Map<OlapTable, String> tableWithPartKey = Maps.newHashMap();
|
||||
private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException {
|
||||
Map<TableIf, String> tableWithPartKey = Maps.newHashMap();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
tableWithPartKey.put(relatedTable, mtmv.getMvPartitionInfo().getRelatedCol());
|
||||
tableWithPartKey
|
||||
.put(mtmv.getMvPartitionInfo().getRelatedTable(), mtmv.getMvPartitionInfo().getRelatedCol());
|
||||
}
|
||||
return tableWithPartKey;
|
||||
}
|
||||
|
||||
|
||||
private MTMVTaskRefreshMode generateRefreshMode(List<Long> needRefreshPartitionIds) {
|
||||
if (CollectionUtils.isEmpty(needRefreshPartitionIds)) {
|
||||
return MTMVTaskRefreshMode.NOT_REFRESH;
|
||||
@ -362,8 +378,9 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
}
|
||||
// check if data is fresh
|
||||
Set<String> excludedTriggerTables = mtmv.getExcludedTriggerTables();
|
||||
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), excludedTriggerTables, 0L);
|
||||
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
|
||||
// to avoid rebuilding the baseTable and causing a change in the tableId
|
||||
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(), 0L);
|
||||
if (fresh) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
@ -375,7 +392,9 @@ public class MTMVTask extends AbstractTask {
|
||||
if (mtmv.getRefreshInfo().getRefreshMethod() == RefreshMethod.COMPLETE) {
|
||||
return mtmv.getPartitionIds();
|
||||
}
|
||||
return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv);
|
||||
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
|
||||
// to avoid rebuilding the baseTable and causing a change in the tableId
|
||||
return MTMVUtil.getMTMVNeedRefreshPartitions(mtmv, relation.getBaseTables());
|
||||
}
|
||||
|
||||
public MTMVTaskContext getTaskContext() {
|
||||
|
||||
@ -17,6 +17,8 @@
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
/**
|
||||
@ -59,10 +61,14 @@ public class MTMVPartitionInfo {
|
||||
this.partitionType = partitionType;
|
||||
}
|
||||
|
||||
public BaseTableInfo getRelatedTable() {
|
||||
public BaseTableInfo getRelatedTableInfo() {
|
||||
return relatedTable;
|
||||
}
|
||||
|
||||
public MTMVRelatedTableIf getRelatedTable() throws AnalysisException {
|
||||
return (MTMVRelatedTableIf) MTMVUtil.getTable(relatedTable);
|
||||
}
|
||||
|
||||
public void setRelatedTable(BaseTableInfo relatedTable) {
|
||||
this.relatedTable = relatedTable;
|
||||
}
|
||||
|
||||
@ -89,7 +89,8 @@ public class MTMVPlanUtil {
|
||||
private static Set<BaseTableInfo> getBaseTables(Plan plan) {
|
||||
TableCollectorContext collectorContext =
|
||||
new TableCollector.TableCollectorContext(
|
||||
com.google.common.collect.Sets.newHashSet(TableType.MATERIALIZED_VIEW, TableType.OLAP));
|
||||
com.google.common.collect.Sets
|
||||
.newHashSet(TableType.values()));
|
||||
plan.accept(TableCollector.INSTANCE, collectorContext);
|
||||
List<TableIf> collectedTables = collectorContext.getCollectedTables();
|
||||
return transferTableIfToInfo(collectedTables);
|
||||
|
||||
@ -0,0 +1,82 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* The table that implements this interface can serve as a partition table followed by MTMV
|
||||
*/
|
||||
public interface MTMVRelatedTableIf extends TableIf {
|
||||
|
||||
/**
|
||||
* Get all partitions of the table
|
||||
*
|
||||
* @return partitionId->PartitionItem
|
||||
*/
|
||||
Map<Long, PartitionItem> getPartitionItems();
|
||||
|
||||
/**
|
||||
* Obtain the latest update time of partition data
|
||||
*
|
||||
* @param partitionId
|
||||
* @param item
|
||||
* @return millisecond
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException;
|
||||
|
||||
/**
|
||||
* getPartitionType LIST/RANGE/UNPARTITIONED
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
PartitionType getPartitionType();
|
||||
|
||||
/**
|
||||
* getPartitionColumnNames
|
||||
*
|
||||
* @return
|
||||
* @throws DdlException
|
||||
*/
|
||||
Set<String> getPartitionColumnNames() throws DdlException;
|
||||
|
||||
/**
|
||||
* Obtain the latest update time of table data
|
||||
*
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
long getLastModifyTime() throws AnalysisException;
|
||||
|
||||
/**
|
||||
* getPartitionColumns
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
List<Column> getPartitionColumns();
|
||||
}
|
||||
@ -18,7 +18,6 @@
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
@ -86,8 +85,7 @@ public class MTMVService {
|
||||
public void createMTMV(MTMV mtmv) throws DdlException, AnalysisException {
|
||||
Objects.requireNonNull(mtmv);
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
OlapTable relatedTable = (OlapTable) MTMVUtil.getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
MTMVUtil.alignMvPartition(mtmv, relatedTable);
|
||||
MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
}
|
||||
LOG.info("createMTMV: " + mtmv.getName());
|
||||
for (MTMVHookService mtmvHookService : hooks.values()) {
|
||||
|
||||
@ -19,13 +19,11 @@ package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.analysis.AddPartitionClause;
|
||||
import org.apache.doris.analysis.DropPartitionClause;
|
||||
import org.apache.doris.analysis.PartitionDesc;
|
||||
import org.apache.doris.analysis.PartitionKeyDesc;
|
||||
import org.apache.doris.analysis.SinglePartitionDesc;
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
@ -44,7 +42,6 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
@ -69,20 +66,6 @@ public class MTMVUtil {
|
||||
return table;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the mtmv is sync with tables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param tables
|
||||
* @param excludedTriggerTables
|
||||
* @param gracePeriod
|
||||
* @return
|
||||
*/
|
||||
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables, Long gracePeriod) {
|
||||
return isSync(getTableMinVisibleVersionTime(mtmv), tables, excludedTriggerTables, gracePeriod);
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the partition is sync with retated partition and other baseTables
|
||||
*
|
||||
@ -94,23 +77,25 @@ public class MTMVUtil {
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
|
||||
private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException {
|
||||
boolean isSyncWithPartition = true;
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
|
||||
OlapTable relatedTable = (OlapTable) getTable(mtmv.getMvPartitionInfo().getRelatedTable());
|
||||
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
|
||||
// if follow base table, not need compare with related table, only should compare with related partition
|
||||
excludedTriggerTables.add(relatedTable.getName());
|
||||
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
|
||||
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
|
||||
long relatedPartitionId = getExistPartitionId(item,
|
||||
relatedTable.getPartitionInfo().getIdToItem(false));
|
||||
relatedPartitionItems);
|
||||
if (relatedPartitionId == -1L) {
|
||||
LOG.warn("can not found related partition: " + partitionId);
|
||||
return false;
|
||||
}
|
||||
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId);
|
||||
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, relatedTable, relatedPartitionId,
|
||||
relatedPartitionItems.get(relatedPartitionId));
|
||||
}
|
||||
return isSyncWithPartition && isSync(
|
||||
return isSyncWithPartition && isFresherThanTables(
|
||||
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(), tables,
|
||||
excludedTriggerTables, gracePeriod);
|
||||
|
||||
@ -124,10 +109,10 @@ public class MTMVUtil {
|
||||
* @throws DdlException
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static void alignMvPartition(MTMV mtmv, OlapTable relatedTable)
|
||||
public static void alignMvPartition(MTMV mtmv, MTMVRelatedTableIf relatedTable)
|
||||
throws DdlException, AnalysisException {
|
||||
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionInfo().getIdToItem(false);
|
||||
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionInfo().getIdToItem(false);
|
||||
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionItems();
|
||||
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionItems();
|
||||
// drop partition of mtmv
|
||||
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
|
||||
@ -139,40 +124,11 @@ public class MTMVUtil {
|
||||
for (Entry<Long, PartitionItem> entry : relatedTableItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), mtmvItems);
|
||||
if (partitionId == -1L) {
|
||||
addPartition(mtmv, relatedTable, entry.getKey());
|
||||
addPartition(mtmv, entry.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* get mv.partitions which not sync with relatedTable
|
||||
* <p>
|
||||
* Comparing the time of mtmv and relatedTable partitioning,
|
||||
* if the visibleVersionTime of the base table is later,
|
||||
* then the partitioning of this mtmv is considered stale
|
||||
*
|
||||
* @param mtmv
|
||||
* @param relatedTable
|
||||
* @return partitionIds
|
||||
* @throws DdlException when partition can not found
|
||||
*/
|
||||
public static Set<Long> getMTMVStalePartitions(MTMV mtmv, OlapTable relatedTable)
|
||||
throws AnalysisException {
|
||||
Set<Long> ids = Sets.newHashSet();
|
||||
Map<Long, Set<Long>> mvToBasePartitions = getMvToBasePartitions(mtmv, relatedTable);
|
||||
for (Entry<Long, Set<Long>> entry : mvToBasePartitions.entrySet()) {
|
||||
for (Long relatedPartitionId : entry.getValue()) {
|
||||
boolean syncWithRelatedPartition = isSyncWithPartition(mtmv, entry.getKey(), relatedTable,
|
||||
relatedPartitionId);
|
||||
if (!syncWithRelatedPartition) {
|
||||
ids.add(entry.getKey());
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return ids;
|
||||
}
|
||||
|
||||
public static List<String> getPartitionNamesByIds(MTMV mtmv, Collection<Long> ids) throws AnalysisException {
|
||||
List<String> res = Lists.newArrayList();
|
||||
for (Long partitionId : ids) {
|
||||
@ -201,7 +157,34 @@ public class MTMVUtil {
|
||||
if (mtmvRelation == null) {
|
||||
return false;
|
||||
}
|
||||
return isMTMVSync(mtmv, mtmv.getRelation().getBaseTables(), Sets.newHashSet(), 0L);
|
||||
try {
|
||||
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), 0L);
|
||||
} catch (AnalysisException e) {
|
||||
LOG.warn("isMTMVSync failed: ", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine whether the mtmv is sync with tables
|
||||
*
|
||||
* @param mtmv
|
||||
* @param tables
|
||||
* @param excludeTables
|
||||
* @param gracePeriod
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, long gracePeriod)
|
||||
throws AnalysisException {
|
||||
Collection<Partition> partitions = mtmv.getPartitions();
|
||||
for (Partition partition : partitions) {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables,
|
||||
gracePeriod)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -217,24 +200,26 @@ public class MTMVUtil {
|
||||
long maxAvailableTime = mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
|
||||
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
|
||||
TableIf table = getTable(baseTableInfo);
|
||||
if (!(table instanceof OlapTable)) {
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
MTMVRelatedTableIf mtmvRelatedTableIf = (MTMVRelatedTableIf) table;
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
|
||||
.getMvPartitionInfo().getRelatedTable().equals(baseTableInfo)) {
|
||||
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
|
||||
PartitionItem item = mtmv.getPartitionInfo().getItemOrAnalysisException(partitionId);
|
||||
Map<Long, PartitionItem> relatedPartitionItems = mtmvRelatedTableIf.getPartitionItems();
|
||||
long relatedPartitionId = getExistPartitionId(item,
|
||||
olapTable.getPartitionInfo().getIdToItem(false));
|
||||
relatedPartitionItems);
|
||||
if (relatedPartitionId == -1L) {
|
||||
throw new AnalysisException("can not found related partition");
|
||||
}
|
||||
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, olapTable, relatedPartitionId);
|
||||
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, mtmvRelatedTableIf,
|
||||
relatedPartitionId, relatedPartitionItems.get(relatedPartitionId));
|
||||
if (!isSyncWithPartition) {
|
||||
res.add(olapTable.getName());
|
||||
res.add(mtmvRelatedTableIf.getName());
|
||||
}
|
||||
} else {
|
||||
long tableLastVisibleVersionTime = getTableMaxVisibleVersionTime((OlapTable) table);
|
||||
long tableLastVisibleVersionTime = mtmvRelatedTableIf.getLastModifyTime();
|
||||
if (tableLastVisibleVersionTime > maxAvailableTime) {
|
||||
res.add(table.getName());
|
||||
}
|
||||
@ -261,6 +246,7 @@ public class MTMVUtil {
|
||||
.isMaterializedViewRewriteEnableContainExternalTable()) {
|
||||
return res;
|
||||
}
|
||||
|
||||
MTMVRelation mtmvRelation = mtmv.getRelation();
|
||||
if (mtmvRelation == null) {
|
||||
return res;
|
||||
@ -291,12 +277,19 @@ public class MTMVUtil {
|
||||
return res;
|
||||
}
|
||||
|
||||
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv) {
|
||||
/**
|
||||
* Get the partitions that need to be refreshed
|
||||
*
|
||||
* @param mtmv
|
||||
* @param baseTables
|
||||
* @return
|
||||
*/
|
||||
public static List<Long> getMTMVNeedRefreshPartitions(MTMV mtmv, Set<BaseTableInfo> baseTables) {
|
||||
Collection<Partition> allPartitions = mtmv.getPartitions();
|
||||
List<Long> res = Lists.newArrayList();
|
||||
for (Partition partition : allPartitions) {
|
||||
try {
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), mtmv.getRelation().getBaseTables(),
|
||||
if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
|
||||
mtmv.getExcludedTriggerTables(),
|
||||
0L)) {
|
||||
res.add(partition.getId());
|
||||
@ -315,14 +308,15 @@ public class MTMVUtil {
|
||||
* @param mtmv
|
||||
* @param mtmvPartitionId
|
||||
* @param relatedTable
|
||||
* @param relatedTablePartitionId
|
||||
* @param relatedPartitionId
|
||||
* @return
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, OlapTable relatedTable,
|
||||
Long relatedTablePartitionId) throws AnalysisException {
|
||||
return mtmv.getPartitionOrAnalysisException(mtmvPartitionId).getVisibleVersionTimeIgnoreInit() >= relatedTable
|
||||
.getPartitionOrAnalysisException(relatedTablePartitionId).getVisibleVersionTimeIgnoreInit();
|
||||
private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, PartitionItem mtmvPartitionItem,
|
||||
MTMVRelatedTableIf relatedTable,
|
||||
Long relatedPartitionId, PartitionItem relatedPartitionItem) throws AnalysisException {
|
||||
return mtmv.getPartitionLastModifyTime(mtmvPartitionId, mtmvPartitionItem) >= relatedTable
|
||||
.getPartitionLastModifyTime(relatedPartitionId, relatedPartitionItem);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -358,23 +352,18 @@ public class MTMVUtil {
|
||||
* add partition for mtmv like relatedPartitionId of relatedTable
|
||||
*
|
||||
* @param mtmv
|
||||
* @param relatedTable
|
||||
* @param relatedPartitionId
|
||||
* @throws AnalysisException
|
||||
* @param partitionItem
|
||||
* @throws DdlException
|
||||
*/
|
||||
private static void addPartition(MTMV mtmv, OlapTable relatedTable, Long relatedPartitionId)
|
||||
throws AnalysisException, DdlException {
|
||||
PartitionDesc partitionDesc = relatedTable.getPartitionInfo().toPartitionDesc(relatedTable);
|
||||
Partition partition = relatedTable.getPartitionOrAnalysisException(relatedPartitionId);
|
||||
SinglePartitionDesc oldPartitionDesc = partitionDesc.getSinglePartitionDescByName(partition.getName());
|
||||
|
||||
private static void addPartition(MTMV mtmv, PartitionItem partitionItem)
|
||||
throws DdlException {
|
||||
PartitionKeyDesc oldPartitionKeyDesc = partitionItem.toPartitionKeyDesc();
|
||||
Map<String, String> partitionProperties = Maps.newHashMap();
|
||||
SinglePartitionDesc singleRangePartitionDesc = new SinglePartitionDesc(true,
|
||||
generatePartitionName(oldPartitionDesc.getPartitionKeyDesc()),
|
||||
oldPartitionDesc.getPartitionKeyDesc(), partitionProperties);
|
||||
SinglePartitionDesc singlePartitionDesc = new SinglePartitionDesc(true,
|
||||
generatePartitionName(oldPartitionKeyDesc),
|
||||
oldPartitionKeyDesc, partitionProperties);
|
||||
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(singleRangePartitionDesc,
|
||||
AddPartitionClause addPartitionClause = new AddPartitionClause(singlePartitionDesc,
|
||||
mtmv.getDefaultDistributionInfo().toDistributionDesc(), partitionProperties, false);
|
||||
Env.getCurrentEnv().addPartition((Database) mtmv.getDatabase(), mtmv.getName(), addPartitionClause);
|
||||
}
|
||||
@ -396,68 +385,6 @@ public class MTMVUtil {
|
||||
return -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the maximum update time among all partitions
|
||||
*
|
||||
* @param table
|
||||
* @return
|
||||
*/
|
||||
private static long getTableMaxVisibleVersionTime(OlapTable table) {
|
||||
long result = 0L;
|
||||
long visibleVersionTime;
|
||||
for (Partition partition : table.getAllPartitions()) {
|
||||
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
|
||||
if (visibleVersionTime > result) {
|
||||
result = visibleVersionTime;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the minimum update time among all partitions
|
||||
*
|
||||
* @param table
|
||||
* @return
|
||||
*/
|
||||
private static long getTableMinVisibleVersionTime(OlapTable table) {
|
||||
long result = Long.MAX_VALUE;
|
||||
long visibleVersionTime;
|
||||
for (Partition partition : table.getAllPartitions()) {
|
||||
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
|
||||
if (visibleVersionTime < result) {
|
||||
result = visibleVersionTime;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Obtain the partition correspondence between materialized views and base tables
|
||||
* Currently, there is a one-to-one correspondence between the partitions of materialized views and base tables,
|
||||
* but for scalability reasons, Set is used
|
||||
* <p>
|
||||
* before use this method,should call `alignMvPartition`
|
||||
*
|
||||
* @param mtmv
|
||||
* @param relatedTable
|
||||
* @return mv.partitionId ==> relatedTable.partitionId
|
||||
*/
|
||||
public static Map<Long, Set<Long>> getMvToBasePartitions(MTMV mtmv, OlapTable relatedTable)
|
||||
throws AnalysisException {
|
||||
HashMap<Long, Set<Long>> res = Maps.newHashMap();
|
||||
Map<Long, PartitionItem> relatedTableItems = relatedTable.getPartitionInfo().getIdToItem(false);
|
||||
Map<Long, PartitionItem> mtmvItems = mtmv.getPartitionInfo().getIdToItem(false);
|
||||
for (Entry<Long, PartitionItem> entry : mtmvItems.entrySet()) {
|
||||
long partitionId = getExistPartitionId(entry.getValue(), relatedTableItems);
|
||||
if (partitionId == -1L) {
|
||||
throw new AnalysisException("partition not found: " + entry.getValue().toString());
|
||||
}
|
||||
res.put(entry.getKey(), Sets.newHashSet(partitionId));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
|
||||
*
|
||||
@ -467,8 +394,8 @@ public class MTMVUtil {
|
||||
* @param gracePeriod
|
||||
* @return
|
||||
*/
|
||||
private static boolean isSync(long visibleVersionTime, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables, Long gracePeriod) {
|
||||
private static boolean isFresherThanTables(long visibleVersionTime, Set<BaseTableInfo> tables,
|
||||
Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException {
|
||||
long maxAvailableTime = visibleVersionTime + gracePeriod;
|
||||
for (BaseTableInfo baseTableInfo : tables) {
|
||||
TableIf table = null;
|
||||
@ -481,10 +408,10 @@ public class MTMVUtil {
|
||||
if (excludedTriggerTables.contains(table.getName())) {
|
||||
continue;
|
||||
}
|
||||
if (!(table instanceof OlapTable)) {
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
continue;
|
||||
}
|
||||
long tableLastVisibleVersionTime = getTableMaxVisibleVersionTime((OlapTable) table);
|
||||
long tableLastVisibleVersionTime = ((MTMVRelatedTableIf) table).getLastModifyTime();
|
||||
if (tableLastVisibleVersionTime > maxAvailableTime) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@ -310,7 +310,7 @@ public abstract class AbstractMaterializedViewRule implements ExplorationRuleFac
|
||||
}
|
||||
// check mv related table partition is valid or not
|
||||
MTMVPartitionInfo mvCustomPartitionInfo = mtmv.getMvPartitionInfo();
|
||||
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTable();
|
||||
BaseTableInfo relatedPartitionTable = mvCustomPartitionInfo.getRelatedTableInfo();
|
||||
if (relatedPartitionTable == null) {
|
||||
return ImmutableSet.of();
|
||||
}
|
||||
|
||||
@ -18,11 +18,11 @@
|
||||
package org.apache.doris.nereids.rules.exploration.mv;
|
||||
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionInfo;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.nereids.CascadesContext;
|
||||
import org.apache.doris.nereids.trees.expressions.Expression;
|
||||
import org.apache.doris.nereids.trees.expressions.NamedExpression;
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalResultSink;
|
||||
import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
|
||||
import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanVisitor;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
@ -84,14 +85,13 @@ public class MaterializedViewUtils {
|
||||
// check sql pattern
|
||||
IncrementCheckerContext context = new IncrementCheckerContext(columnSlot);
|
||||
materializedViewPlan.accept(MaterializedViewIncrementChecker.INSTANCE, context);
|
||||
if (context.getRelatedTable() == null
|
||||
|| context.getRelatedTableColumn() == null
|
||||
|| !context.isPctPossible()) {
|
||||
if (context.getTableColumnList().isEmpty() || !context.isPctPossible()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
return Optional.of(new RelatedTableInfo(new BaseTableInfo(context.getRelatedTable()),
|
||||
// TODO support to return only one related table info, support multi later
|
||||
return Optional.of(new RelatedTableInfo(new BaseTableInfo(context.getTableColumnList().get(0).key()),
|
||||
context.isPctPossible(),
|
||||
context.getRelatedTableColumn().getName()));
|
||||
context.getTableColumnList().get(0).value().getName()));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -222,25 +222,25 @@ public class MaterializedViewUtils {
|
||||
|
||||
@Override
|
||||
public Void visitLogicalRelation(LogicalRelation relation, IncrementCheckerContext context) {
|
||||
if (!(relation instanceof LogicalCatalogRelation) || context.getRelatedTable() != null) {
|
||||
if (!(relation instanceof LogicalCatalogRelation) || !context.getTableColumnList().isEmpty()) {
|
||||
return visit(relation, context);
|
||||
}
|
||||
LogicalCatalogRelation logicalCatalogRelation = (LogicalCatalogRelation) relation;
|
||||
TableIf table = logicalCatalogRelation.getTable();
|
||||
if (!(table instanceof OlapTable)) {
|
||||
if (!(table instanceof MTMVRelatedTableIf)) {
|
||||
return visit(relation, context);
|
||||
}
|
||||
OlapTable olapTable = (OlapTable) table;
|
||||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
Set<Column> partitionColumnSet = new HashSet<>(partitionInfo.getPartitionColumns());
|
||||
if (PartitionType.UNPARTITIONED.equals(partitionInfo.getType())) {
|
||||
MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table;
|
||||
PartitionType type = relatedTable.getPartitionType();
|
||||
|
||||
if (PartitionType.UNPARTITIONED.equals(type)) {
|
||||
return visit(relation, context);
|
||||
}
|
||||
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns());
|
||||
Column mvReferenceColumn = context.getMvPartitionColumn().getColumn().get();
|
||||
if (partitionColumnSet.contains(mvReferenceColumn)) {
|
||||
context.setRelatedTable(table);
|
||||
context.setRelatedTableColumn(mvReferenceColumn);
|
||||
context.setPctPossible(!mvReferenceColumn.isAllowNull());
|
||||
context.addTableColumn(table, mvReferenceColumn);
|
||||
context.setPctPossible(true);
|
||||
}
|
||||
return visit(relation, context);
|
||||
}
|
||||
@ -313,8 +313,7 @@ public class MaterializedViewUtils {
|
||||
private static final class IncrementCheckerContext {
|
||||
private final SlotReference mvPartitionColumn;
|
||||
private boolean pctPossible = true;
|
||||
private TableIf relatedTable;
|
||||
private Column relatedTableColumn;
|
||||
private final List<Pair<TableIf, Column>> tableColumnList = new ArrayList<>();
|
||||
private boolean joinNullGenerateSide;
|
||||
|
||||
public IncrementCheckerContext(SlotReference mvPartitionColumn) {
|
||||
@ -333,20 +332,12 @@ public class MaterializedViewUtils {
|
||||
this.pctPossible = pctPossible;
|
||||
}
|
||||
|
||||
public TableIf getRelatedTable() {
|
||||
return relatedTable;
|
||||
public void addTableColumn(TableIf relatedTable, Column partitionColumn) {
|
||||
tableColumnList.add(Pair.of(relatedTable, partitionColumn));
|
||||
}
|
||||
|
||||
public void setRelatedTable(TableIf relatedTable) {
|
||||
this.relatedTable = relatedTable;
|
||||
}
|
||||
|
||||
public Column getRelatedTableColumn() {
|
||||
return relatedTableColumn;
|
||||
}
|
||||
|
||||
public void setRelatedTableColumn(Column relatedTableColumn) {
|
||||
this.relatedTableColumn = relatedTableColumn;
|
||||
public List<Pair<TableIf, Column>> getTableColumnList() {
|
||||
return tableColumnList;
|
||||
}
|
||||
|
||||
public boolean isJoinNullGenerateSide() {
|
||||
|
||||
@ -20,7 +20,6 @@ package org.apache.doris.nereids.trees.plans.commands;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionKey;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
@ -72,9 +71,9 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
* @return command
|
||||
*/
|
||||
public static UpdateMvByPartitionCommand from(MTMV mv, Set<Long> partitionIds,
|
||||
Map<OlapTable, String> tableWithPartKey) {
|
||||
Map<TableIf, String> tableWithPartKey) {
|
||||
NereidsParser parser = new NereidsParser();
|
||||
Map<OlapTable, Set<Expression>> predicates =
|
||||
Map<TableIf, Set<Expression>> predicates =
|
||||
constructTableWithPredicates(mv, partitionIds, tableWithPartKey);
|
||||
List<String> parts = constructPartsForMv(mv, partitionIds);
|
||||
Plan plan = parser.parseSingle(mv.getQuerySql());
|
||||
@ -94,12 +93,12 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
.collect(ImmutableList.toImmutableList());
|
||||
}
|
||||
|
||||
private static Map<OlapTable, Set<Expression>> constructTableWithPredicates(MTMV mv,
|
||||
Set<Long> partitionIds, Map<OlapTable, String> tableWithPartKey) {
|
||||
private static Map<TableIf, Set<Expression>> constructTableWithPredicates(MTMV mv,
|
||||
Set<Long> partitionIds, Map<TableIf, String> tableWithPartKey) {
|
||||
Set<PartitionItem> items = partitionIds.stream()
|
||||
.map(id -> mv.getPartitionInfo().getItem(id))
|
||||
.collect(ImmutableSet.toImmutableSet());
|
||||
ImmutableMap.Builder<OlapTable, Set<Expression>> builder = new ImmutableMap.Builder<>();
|
||||
ImmutableMap.Builder<TableIf, Set<Expression>> builder = new ImmutableMap.Builder<>();
|
||||
tableWithPartKey.forEach((table, colName) ->
|
||||
builder.put(table, constructPredicates(items, colName))
|
||||
);
|
||||
@ -137,13 +136,13 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand {
|
||||
}
|
||||
}
|
||||
|
||||
static class PredicateAdder extends DefaultPlanRewriter<Map<OlapTable, Set<Expression>>> {
|
||||
static class PredicateAdder extends DefaultPlanRewriter<Map<TableIf, Set<Expression>>> {
|
||||
@Override
|
||||
public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<OlapTable, Set<Expression>> predicates) {
|
||||
public Plan visitUnboundRelation(UnboundRelation unboundRelation, Map<TableIf, Set<Expression>> predicates) {
|
||||
List<String> tableQualifier = RelationUtil.getQualifierName(ConnectContext.get(),
|
||||
unboundRelation.getNameParts());
|
||||
TableIf table = RelationUtil.getTable(tableQualifier, Env.getCurrentEnv());
|
||||
if (table instanceof OlapTable && predicates.containsKey(table)) {
|
||||
if (predicates.containsKey(table)) {
|
||||
return new LogicalFilter<>(ImmutableSet.of(ExpressionUtils.or(predicates.get(table))),
|
||||
unboundRelation);
|
||||
}
|
||||
|
||||
@ -26,7 +26,6 @@ import org.apache.doris.analysis.TableName;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.KeysType;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.TableIf.TableType;
|
||||
@ -40,6 +39,7 @@ import org.apache.doris.mtmv.MTMVPartitionInfo;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVPlanUtil;
|
||||
import org.apache.doris.mtmv.MTMVRefreshInfo;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
import org.apache.doris.mtmv.MTMVRelation;
|
||||
import org.apache.doris.mtmv.MTMVUtil;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
@ -269,18 +269,19 @@ public class CreateMTMVInfo {
|
||||
if (!relatedTableInfo.isPresent() || !relatedTableInfo.get().isPctPossible()) {
|
||||
throw new AnalysisException("Unable to find a suitable base table for partitioning");
|
||||
}
|
||||
TableIf followTable = null;
|
||||
TableIf relatedTable = null;
|
||||
try {
|
||||
followTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
|
||||
relatedTable = MTMVUtil.getTable(relatedTableInfo.get().getTableInfo());
|
||||
} catch (org.apache.doris.common.AnalysisException e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
if (!(followTable instanceof OlapTable)) {
|
||||
throw new AnalysisException("base table for partitioning only can be OlapTable.");
|
||||
if (!(relatedTable instanceof MTMVRelatedTableIf)) {
|
||||
throw new AnalysisException("base table for partitioning only can be OlapTable or HMSTable");
|
||||
}
|
||||
MTMVRelatedTableIf mtmvBaseRealtedTable = (MTMVRelatedTableIf) relatedTable;
|
||||
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
|
||||
try {
|
||||
partitionColumnNames.addAll(((OlapTable) followTable).getPartitionColumnNames());
|
||||
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
|
||||
} catch (DdlException e) {
|
||||
throw new AnalysisException(e.getMessage(), e);
|
||||
}
|
||||
@ -293,7 +294,7 @@ public class CreateMTMVInfo {
|
||||
}
|
||||
mvPartitionInfo.setRelatedTable(relatedTableInfo.get().getTableInfo());
|
||||
mvPartitionInfo.setRelatedCol(relatedTableInfo.get().getColumn());
|
||||
partitionDesc = generatePartitionDesc((OlapTable) followTable);
|
||||
partitionDesc = generatePartitionDesc(mtmvBaseRealtedTable);
|
||||
} finally {
|
||||
// after operate, roll back the disable rules
|
||||
sessionVariable.setDisableNereidsRules(String.join(",", tempDisableRules));
|
||||
@ -302,9 +303,9 @@ public class CreateMTMVInfo {
|
||||
}
|
||||
}
|
||||
|
||||
private PartitionDesc generatePartitionDesc(OlapTable relatedTable) {
|
||||
PartitionType type = relatedTable.getPartitionInfo().getType();
|
||||
private PartitionDesc generatePartitionDesc(MTMVRelatedTableIf relatedTable) {
|
||||
try {
|
||||
PartitionType type = relatedTable.getPartitionType();
|
||||
if (type == PartitionType.RANGE) {
|
||||
return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
|
||||
Lists.newArrayList());
|
||||
|
||||
@ -53,6 +53,7 @@ import org.apache.doris.thrift.TFileType;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import lombok.Setter;
|
||||
import org.apache.hadoop.hive.common.ValidWriteIdList;
|
||||
import org.apache.hadoop.hive.metastore.api.FieldSchema;
|
||||
@ -178,7 +179,7 @@ public class HiveScanNode extends FileQueryScanNode {
|
||||
// so that we can unify the interface.
|
||||
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
|
||||
hmsTable.getRemoteTable().getSd().getInputFormat(),
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null);
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
|
||||
this.totalPartitionNum = 1;
|
||||
this.readPartitionNum = 1;
|
||||
resPartitions.add(dummyPartition);
|
||||
|
||||
@ -43,6 +43,7 @@ import org.apache.doris.thrift.THudiFileDesc;
|
||||
import org.apache.doris.thrift.TTableFormatFileDesc;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.avro.Schema;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
@ -194,7 +195,8 @@ public class HudiScanNode extends HiveScanNode {
|
||||
return filteredPartitionIds.stream().map(id -> {
|
||||
String path = basePath + "/" + partitionIdToNameMap.get(id);
|
||||
return new HivePartition(
|
||||
dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id));
|
||||
dbName, tblName, false, inputFormat, path, partitionValuesMap.get(id),
|
||||
Maps.newHashMap());
|
||||
}).collect(Collectors.toList());
|
||||
} finally {
|
||||
partitionValues.readLock().unlock();
|
||||
@ -205,7 +207,7 @@ public class HudiScanNode extends HiveScanNode {
|
||||
// so that we can unify the interface.
|
||||
HivePartition dummyPartition = new HivePartition(hmsTable.getDbName(), hmsTable.getName(), true,
|
||||
hmsTable.getRemoteTable().getSd().getInputFormat(),
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null);
|
||||
hmsTable.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap());
|
||||
this.totalPartitionNum = 1;
|
||||
this.readPartitionNum = 1;
|
||||
return Lists.newArrayList(dummyPartition);
|
||||
|
||||
@ -76,6 +76,7 @@ import org.apache.doris.system.Frontend;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.text.StringSubstitutor;
|
||||
@ -714,7 +715,7 @@ public class StatisticsUtil {
|
||||
} else {
|
||||
hivePartitions.add(new HivePartition(table.getDbName(), table.getName(), true,
|
||||
table.getRemoteTable().getSd().getInputFormat(),
|
||||
table.getRemoteTable().getSd().getLocation(), null));
|
||||
table.getRemoteTable().getSd().getLocation(), null, Maps.newHashMap()));
|
||||
}
|
||||
// Get files for all partitions.
|
||||
String bindBrokerName = table.getCatalog().bindBrokerName();
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.nereids.util.PlanChecker;
|
||||
import org.apache.doris.utframe.TestWithFeService;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Optional;
|
||||
@ -196,6 +197,7 @@ public class MaterializedViewUtilsTest extends TestWithFeService {
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void getRelatedTableInfoTestWithoutGroupNullTest() {
|
||||
PlanChecker.from(connectContext)
|
||||
.checkExplain("SELECT (o.c1_abs + ps.c2_abs) as add_alias, l.L_SHIPDATE, l.L_ORDERKEY, o.O_ORDERDATE, "
|
||||
|
||||
14
regression-test/data/mtmv_p0/test_hive_mtmv.out
Normal file
14
regression-test/data/mtmv_p0/test_hive_mtmv.out
Normal file
@ -0,0 +1,14 @@
|
||||
-- This file is automatically generated. You should know what you did if you want to edit this
|
||||
-- !refresh_one_partition --
|
||||
1 A 20230101
|
||||
2 B 20230101
|
||||
3 C 20230101
|
||||
|
||||
-- !refresh_other_partition --
|
||||
1 A 20230101
|
||||
2 B 20230101
|
||||
3 C 20230101
|
||||
4 D 20230102
|
||||
5 E 20230102
|
||||
6 F 20230102
|
||||
|
||||
73
regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
Normal file
73
regression-test/suites/mtmv_p0/test_hive_mtmv.groovy
Normal file
@ -0,0 +1,73 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
suite("test_hive_mtmv", "p0,external,hive,external_docker,external_docker_hive") {
|
||||
String enabled = context.config.otherConfigs.get("enableHiveTest")
|
||||
if (enabled != null && enabled.equalsIgnoreCase("true")) {
|
||||
try {
|
||||
String hms_port = context.config.otherConfigs.get("hms_port")
|
||||
String catalog_name = "hive_test_mtmv"
|
||||
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
|
||||
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
sql """create catalog if not exists ${catalog_name} properties (
|
||||
"type"="hms",
|
||||
'hive.metastore.uris' = 'thrift://${externalEnvIp}:${hms_port}'
|
||||
);"""
|
||||
// sql """use `${catalog_name}`.`default`"""
|
||||
def mvName = "test_hive_mtmv"
|
||||
def dbName = "regression_test_mtmv_p0"
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
|
||||
sql """
|
||||
CREATE MATERIALIZED VIEW ${mvName}
|
||||
BUILD DEFERRED REFRESH AUTO ON MANUAL
|
||||
partition by(`part_col`)
|
||||
DISTRIBUTED BY RANDOM BUCKETS 2
|
||||
PROPERTIES ('replication_num' = '1')
|
||||
AS
|
||||
SELECT * FROM ${catalog_name}.`default`.mtmv_base1;
|
||||
"""
|
||||
def showPartitionsResult = sql """show partitions from ${mvName}"""
|
||||
logger.info("showPartitionsResult: " + showPartitionsResult.toString())
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20230101"))
|
||||
assertTrue(showPartitionsResult.toString().contains("p_20230102"))
|
||||
|
||||
// refresh one partitions
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName} partitions(p_20230101);
|
||||
"""
|
||||
jobName = getJobName(dbName, mvName);
|
||||
log.info(jobName)
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_refresh_one_partition "SELECT * FROM ${mvName} order by id"
|
||||
|
||||
//refresh other partitions
|
||||
sql """
|
||||
REFRESH MATERIALIZED VIEW ${mvName}
|
||||
"""
|
||||
waitingMTMVTaskFinished(jobName)
|
||||
order_qt_refresh_other_partition "SELECT * FROM ${mvName} order by id"
|
||||
|
||||
sql """drop materialized view if exists ${mvName};"""
|
||||
|
||||
sql """drop catalog if exists ${catalog_name}"""
|
||||
} finally {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user