[enhance](mtmv)use version instead of timestamp (#30599)

MTMV records snapshot information for each refresh of data, used to compare whether partitions need to be updated
This commit is contained in:
zhangdong
2024-02-01 14:42:47 +08:00
committed by yiguolei
parent b86bd2672f
commit 379b541494
16 changed files with 598 additions and 107 deletions

View File

@ -897,7 +897,7 @@ public class Alter {
mtmv.alterMvProperties(alterMTMV.getMvProperties());
break;
case ADD_TASK:
mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation());
mtmv.addTaskResult(alterMTMV.getTask(), alterMTMV.getRelation(), alterMTMV.getPartitionSnapshots());
Env.getCurrentEnv().getMtmvService()
.refreshComplete(mtmv, alterMTMV.getRelation(), alterMTMV.getTask());
break;

View File

@ -178,6 +178,7 @@ import org.apache.doris.master.PartitionInMemoryInfoCollector;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVAlterOpType;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVService;
import org.apache.doris.mtmv.MTMVStatus;
@ -5988,10 +5989,12 @@ public class Env {
this.alter.processAlterMTMV(alter, false);
}
public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, MTMVRelation relation) {
public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, MTMVRelation relation,
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
AlterMTMV alter = new AlterMTMV(mvName, MTMVAlterOpType.ADD_TASK);
alter.setTask(task);
alter.setRelation(relation);
alter.setPartitionSnapshots(partitionSnapshots);
this.alter.processAlterMTMV(alter, false);
}

View File

@ -32,6 +32,8 @@ import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVRefreshState;
import org.apache.doris.mtmv.MTMVRefreshEnum.MTMVState;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRefreshSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.persist.gson.GsonUtils;
@ -69,6 +71,8 @@ public class MTMV extends OlapTable {
private MTMVRelation relation;
@SerializedName("mpi")
private MTMVPartitionInfo mvPartitionInfo;
@SerializedName("rs")
private MTMVRefreshSnapshot refreshSnapshot;
// Should update after every fresh, not persist
private MTMVCache cache;
@ -96,6 +100,7 @@ public class MTMV extends OlapTable {
this.mvProperties = params.mvProperties;
this.mvPartitionInfo = params.mvPartitionInfo;
this.relation = params.relation;
this.refreshSnapshot = new MTMVRefreshSnapshot();
mvRwLock = new ReentrantReadWriteLock(true);
}
@ -145,7 +150,8 @@ public class MTMV extends OlapTable {
}
}
public void addTaskResult(MTMVTask task, MTMVRelation relation) {
public void addTaskResult(MTMVTask task, MTMVRelation relation,
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
try {
writeMvLock();
if (task.getStatus() == TaskStatus.SUCCESS) {
@ -165,6 +171,7 @@ public class MTMV extends OlapTable {
this.status.setRefreshState(MTMVRefreshState.FAIL);
}
this.jobInfo.addHistoryTask(task);
this.refreshSnapshot.updateSnapshots(partitionSnapshots, getPartitionNames());
} finally {
writeMvUnlock();
}
@ -177,7 +184,7 @@ public class MTMV extends OlapTable {
public long getGracePeriod() {
if (mvProperties.containsKey(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) {
return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD));
return Long.parseLong(mvProperties.get(PropertyAnalyzer.PROPERTIES_GRACE_PERIOD)) * 1000;
} else {
return 0L;
}
@ -222,6 +229,10 @@ public class MTMV extends OlapTable {
return mvPartitionInfo;
}
public MTMVRefreshSnapshot getRefreshSnapshot() {
return refreshSnapshot;
}
public void readMvLock() {
this.mvRwLock.readLock().lock();
}
@ -256,6 +267,7 @@ public class MTMV extends OlapTable {
mvProperties = materializedView.mvProperties;
relation = materializedView.relation;
mvPartitionInfo = materializedView.mvPartitionInfo;
refreshSnapshot = materializedView.refreshSnapshot;
}
}

View File

@ -47,6 +47,8 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.resource.Tag;
@ -2534,26 +2536,26 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
return getPartitionInfo().getIdToItem(false);
}
@Override
public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException {
return getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
}
@Override
public long getLastModifyTime() {
long result = 0L;
long visibleVersionTime;
for (Partition partition : getAllPartitions()) {
visibleVersionTime = partition.getVisibleVersionTimeIgnoreInit();
if (visibleVersionTime > result) {
result = visibleVersionTime;
}
}
return result;
}
@Override
public List<Column> getPartitionColumns() {
return getPartitionInfo().getPartitionColumns();
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException {
long visibleVersion = getPartitionOrAnalysisException(partitionId).getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}
@Override
public MTMVSnapshotIf getTableSnapshot() {
long visibleVersion = getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion);
}
@Override
public String getPartitionName(long partitionId) throws AnalysisException {
return getPartitionOrAnalysisException(partitionId).getName();
}
}

View File

@ -33,7 +33,10 @@ import org.apache.doris.datasource.hive.HMSCachedClient;
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
import org.apache.doris.datasource.hive.HivePartition;
import org.apache.doris.external.iceberg.util.IcebergUtils;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
@ -452,6 +455,15 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
return initSchema();
}
public long getLastDdlTime() {
makeSureInitialized();
Map<String, String> parameters = remoteTable.getParameters();
if (parameters == null || !parameters.containsKey(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) {
return 0L;
}
return Long.parseLong(parameters.get(TBL_PROP_TRANSIENT_LAST_DDL_TIME)) * 1000;
}
@Override
public List<Column> initSchema() {
makeSureInitialized();
@ -561,7 +573,7 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
public boolean hasColumnStatistics(String colName) {
Map<String, String> parameters = remoteTable.getParameters();
return parameters.keySet().stream()
.filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent();
.filter(k -> k.startsWith(SPARK_COL_STATS + colName + ".")).findAny().isPresent();
}
public boolean fillColumnStatistics(String colName, Map<StatsType, String> statsTypes, Map<String, String> stats) {
@ -772,13 +784,13 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
@Override
public boolean isDistributionColumn(String columnName) {
return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
.collect(Collectors.toSet()).contains(columnName.toLowerCase());
.collect(Collectors.toSet()).contains(columnName.toLowerCase());
}
@Override
public Set<String> getDistributionColumnNames() {
return getRemoteTable().getSd().getBucketCols().stream().map(String::toLowerCase)
.collect(Collectors.toSet());
.collect(Collectors.toSet());
}
@Override
@ -805,32 +817,73 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException {
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1);
partitionValuesList.add(
((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
public String getPartitionName(long partitionId) throws AnalysisException {
Map<String, Long> partitionNameToIdMap = getHivePartitionValues().getPartitionNameToIdMap();
for (Entry<String, Long> entry : partitionNameToIdMap.entrySet()) {
if (entry.getValue().equals(partitionId)) {
return entry.getKey();
}
}
throw new AnalysisException("can not find partition, partitionId: " + partitionId);
}
private HiveMetaStoreCache.HivePartitionValues getHivePartitionValues() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
List<HivePartition> resPartitions = cache.getAllPartitionsWithCache(getDbName(), getName(),
partitionValuesList);
if (resPartitions.size() != 1) {
throw new AnalysisException("partition not normal, size: " + resPartitions.size());
}
return resPartitions.get(0).getLastModifiedTimeIgnoreInit();
return cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
}
@Override
public long getLastModifyTime() throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException {
long partitionLastModifyTime = getPartitionLastModifyTime(partitionId);
return new MTMVTimestampSnapshot(partitionLastModifyTime);
}
long result = 0L;
@Override
public MTMVSnapshotIf getTableSnapshot() throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(-1L, getLastDdlTime());
}
long partitionId = 0L;
long maxVersionTime = 0L;
long visibleVersionTime;
for (Entry<Long, PartitionItem> entry : getPartitionItems().entrySet()) {
visibleVersionTime = getPartitionLastModifyTime(entry.getKey(), entry.getValue());
if (visibleVersionTime > result) {
result = visibleVersionTime;
visibleVersionTime = getPartitionLastModifyTime(entry.getKey());
if (visibleVersionTime > maxVersionTime) {
maxVersionTime = visibleVersionTime;
partitionId = entry.getKey();
}
}
return result;
return new MTMVMaxTimestampSnapshot(partitionId, maxVersionTime);
}
private long getPartitionLastModifyTime(long partitionId) throws AnalysisException {
return getPartitionById(partitionId).getLastModifiedTime();
}
private HivePartition getPartitionById(long partitionId) throws AnalysisException {
PartitionItem item = getPartitionItems().get(partitionId);
List<List<String>> partitionValuesList = transferPartitionItemToPartitionValues(item);
List<HivePartition> partitions = getPartitionsByPartitionValues(partitionValuesList);
if (partitions.size() != 1) {
throw new AnalysisException("partition not normal, size: " + partitions.size());
}
return partitions.get(0);
}
private List<HivePartition> getPartitionsByPartitionValues(List<List<String>> partitionValuesList) {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
return cache.getAllPartitionsWithCache(getDbName(), getName(),
partitionValuesList);
}
private List<List<String>> transferPartitionItemToPartitionValues(PartitionItem item) {
List<List<String>> partitionValuesList = Lists.newArrayListWithCapacity(1);
partitionValuesList.add(
((ListPartitionItem) item).getItems().get(0).getPartitionValuesAsStringListForHive());
return partitionValuesList;
}
}

View File

@ -38,6 +38,7 @@ import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshMethod;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
@ -133,6 +134,7 @@ public class MTMVTask extends AbstractTask {
private MTMV mtmv;
private MTMVRelation relation;
private StmtExecutor executor;
private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
public MTMVTask() {
}
@ -155,8 +157,9 @@ public class MTMVTask extends AbstractTask {
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
// such as deleting a table and creating a view with the same name
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
// Before obtaining information from hmsTable, refresh to ensure that the data is up-to-date
refreshHmsTable();
// Now, the MTMV first ensures consistency with the data in the cache.
// To be completely consistent with hive, you need to manually refresh the cache
// refreshHmsTable();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVUtil.alignMvPartition(mtmv, mtmv.getMvPartitionInfo().getRelatedTable());
}
@ -171,6 +174,7 @@ public class MTMVTask extends AbstractTask {
int refreshPartitionNum = mtmv.getRefreshPartitionNum();
long execNum = (needRefreshPartitionIds.size() / refreshPartitionNum) + ((needRefreshPartitionIds.size()
% refreshPartitionNum) > 0 ? 1 : 0);
this.partitionSnapshots = Maps.newHashMap();
for (int i = 0; i < execNum; i++) {
int start = i * refreshPartitionNum;
int end = start + refreshPartitionNum;
@ -178,8 +182,11 @@ public class MTMVTask extends AbstractTask {
.subList(start, end > needRefreshPartitionIds.size() ? needRefreshPartitionIds.size() : end));
// need get names before exec
List<String> execPartitionNames = MTMVUtil.getPartitionNamesByIds(mtmv, execPartitionIds);
Map<String, MTMVRefreshPartitionSnapshot> execPartitionSnapshots = MTMVUtil
.generatePartitionSnapshots(mtmv, relation.getBaseTables(), execPartitionIds);
exec(ctx, execPartitionIds, tableWithPartKey);
completedPartitions.addAll(execPartitionNames);
partitionSnapshots.putAll(execPartitionSnapshots);
}
} catch (Throwable e) {
LOG.warn("run task failed: ", e);
@ -241,6 +248,12 @@ public class MTMVTask extends AbstractTask {
}
}
/**
* // Before obtaining information from hmsTable, refresh to ensure that the data is up-to-date
*
* @throws AnalysisException
* @throws DdlException
*/
private void refreshHmsTable() throws AnalysisException, DdlException {
for (BaseTableInfo tableInfo : relation.getBaseTables()) {
TableIf tableIf = MTMVUtil.getTable(tableInfo);
@ -345,11 +358,13 @@ public class MTMVTask extends AbstractTask {
private void after() {
if (mtmv != null) {
Env.getCurrentEnv()
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation);
.addMTMVTaskResult(new TableNameInfo(mtmv.getQualifiedDbName(), mtmv.getName()), this, relation,
partitionSnapshots);
}
mtmv = null;
relation = null;
executor = null;
partitionSnapshots = null;
}
private Map<TableIf, String> getIncrementalTableMap() throws AnalysisException {
@ -384,7 +399,7 @@ public class MTMVTask extends AbstractTask {
// check if data is fresh
// We need to use a newly generated relationship and cannot retrieve it using mtmv.getRelation()
// to avoid rebuilding the baseTable and causing a change in the tableId
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables(), 0L);
boolean fresh = MTMVUtil.isMTMVSync(mtmv, relation.getBaseTables(), mtmv.getExcludedTriggerTables());
if (fresh) {
return Lists.newArrayList();
}

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
/**
* The version cannot be obtained from the hive table,
* so the update time is used instead of the version
*/
public class MTMVMaxTimestampSnapshot implements MTMVSnapshotIf {
// partitionId corresponding to timestamp
// The reason why both timestamp and partitionId are stored is to avoid
// deleting the partition corresponding to timestamp
@SerializedName("p")
private long partitionId;
// The maximum modify time in all partitions
@SerializedName("t")
private long timestamp;
public MTMVMaxTimestampSnapshot(long partitionId, long timestamp) {
this.partitionId = partitionId;
this.timestamp = timestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MTMVMaxTimestampSnapshot that = (MTMVMaxTimestampSnapshot) o;
return partitionId == that.partitionId
&& timestamp == that.timestamp;
}
@Override
public int hashCode() {
return Objects.hashCode(partitionId, timestamp);
}
}

View File

@ -0,0 +1,43 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.Map;
public class MTMVRefreshPartitionSnapshot {
@SerializedName("p")
private Map<String, MTMVSnapshotIf> partitions;
@SerializedName("t")
private Map<Long, MTMVSnapshotIf> tables;
public MTMVRefreshPartitionSnapshot() {
this.partitions = Maps.newConcurrentMap();
this.tables = Maps.newConcurrentMap();
}
public Map<String, MTMVSnapshotIf> getPartitions() {
return partitions;
}
public Map<Long, MTMVSnapshotIf> getTables() {
return tables;
}
}

View File

@ -0,0 +1,75 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.commons.collections.MapUtils;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class MTMVRefreshSnapshot {
@SerializedName("ps")
private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
public MTMVRefreshSnapshot() {
this.partitionSnapshots = Maps.newConcurrentMap();
}
public boolean equalsWithRelatedPartition(String mtmvPartitionName, String relatedPartitionName,
MTMVSnapshotIf relatedPartitionCurrentSnapshot) {
MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName);
if (partitionSnapshot == null) {
return false;
}
MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getPartitions().get(relatedPartitionName);
if (relatedPartitionSnapshot == null) {
return false;
}
return relatedPartitionSnapshot.equals(relatedPartitionCurrentSnapshot);
}
public boolean equalsWithBaseTable(String mtmvPartitionName, long baseTableId,
MTMVSnapshotIf baseTableCurrentSnapshot) {
MTMVRefreshPartitionSnapshot partitionSnapshot = partitionSnapshots.get(mtmvPartitionName);
if (partitionSnapshot == null) {
return false;
}
MTMVSnapshotIf relatedPartitionSnapshot = partitionSnapshot.getTables().get(baseTableId);
if (relatedPartitionSnapshot == null) {
return false;
}
return relatedPartitionSnapshot.equals(baseTableCurrentSnapshot);
}
public void updateSnapshots(Map<String, MTMVRefreshPartitionSnapshot> addPartitionSnapshots,
Set<String> mvPartitionNames) {
if (!MapUtils.isEmpty(addPartitionSnapshots)) {
this.partitionSnapshots.putAll(addPartitionSnapshots);
}
Iterator<String> iterator = partitionSnapshots.keySet().iterator();
while (iterator.hasNext()) {
String partitionName = iterator.next();
if (!mvPartitionNames.contains(partitionName)) {
iterator.remove();
}
}
}
}

View File

@ -40,16 +40,6 @@ public interface MTMVRelatedTableIf extends TableIf {
*/
Map<Long, PartitionItem> getPartitionItems();
/**
* Obtain the latest update time of partition data
*
* @param partitionId
* @param item
* @return millisecond
* @throws AnalysisException
*/
long getPartitionLastModifyTime(long partitionId, PartitionItem item) throws AnalysisException;
/**
* getPartitionType LIST/RANGE/UNPARTITIONED
*
@ -65,18 +55,36 @@ public interface MTMVRelatedTableIf extends TableIf {
*/
Set<String> getPartitionColumnNames() throws DdlException;
/**
* Obtain the latest update time of table data
*
* @return
* @throws AnalysisException
*/
long getLastModifyTime() throws AnalysisException;
/**
* getPartitionColumns
*
* @return
*/
List<Column> getPartitionColumns();
/**
* getPartitionSnapshot
*
* @param partitionId
* @return partition snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getPartitionSnapshot(long partitionId) throws AnalysisException;
/**
* getTableSnapshot
*
* @return table snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getTableSnapshot() throws AnalysisException;
/**
* getPartitionName
*
* @param partitionId
* @return partitionName
* @throws AnalysisException
*/
String getPartitionName(long partitionId) throws AnalysisException;
}

View File

@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
/**
* MTMV refresh snapshot
*/
public interface MTMVSnapshotIf {
}

View File

@ -0,0 +1,51 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
/**
* The version cannot be obtained from the hive table,
* so the update time is used instead of the version
*/
public class MTMVTimestampSnapshot implements MTMVSnapshotIf {
@SerializedName("t")
private long timestamp;
public MTMVTimestampSnapshot(long timestamp) {
this.timestamp = timestamp;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MTMVTimestampSnapshot that = (MTMVTimestampSnapshot) o;
return timestamp == that.timestamp;
}
@Override
public int hashCode() {
return Objects.hashCode(timestamp);
}
}

View File

@ -73,12 +73,11 @@ public class MTMVUtil {
* @param partitionId
* @param tables
* @param excludedTriggerTables
* @param gracePeriod
* @return
* @throws AnalysisException
*/
private static boolean isMTMVPartitionSync(MTMV mtmv, Long partitionId, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException {
Set<String> excludedTriggerTables) throws AnalysisException {
boolean isSyncWithPartition = true;
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
@ -92,12 +91,9 @@ public class MTMVUtil {
LOG.warn("can not found related partition: " + partitionId);
return false;
}
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, relatedTable, relatedPartitionId,
relatedPartitionItems.get(relatedPartitionId));
isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, relatedTable, relatedPartitionId);
}
return isSyncWithPartition && isFresherThanTables(
mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit(), tables,
excludedTriggerTables, gracePeriod);
return isSyncWithPartition && isSyncWithAllBaseTables(mtmv, partitionId, tables, excludedTriggerTables);
}
@ -158,7 +154,7 @@ public class MTMVUtil {
return false;
}
try {
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet(), 0L);
return isMTMVSync(mtmv, mtmvRelation.getBaseTables(), Sets.newHashSet());
} catch (AnalysisException e) {
LOG.warn("isMTMVSync failed: ", e);
return false;
@ -171,16 +167,14 @@ public class MTMVUtil {
* @param mtmv
* @param tables
* @param excludeTables
* @param gracePeriod
* @return
* @throws AnalysisException
*/
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables, long gracePeriod)
public static boolean isMTMVSync(MTMV mtmv, Set<BaseTableInfo> tables, Set<String> excludeTables)
throws AnalysisException {
Collection<Partition> partitions = mtmv.getPartitions();
for (Partition partition : partitions) {
if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables,
gracePeriod)) {
if (!isMTMVPartitionSync(mtmv, partition.getId(), tables, excludeTables)) {
return false;
}
}
@ -197,7 +191,6 @@ public class MTMVUtil {
*/
public static List<String> getPartitionUnSyncTables(MTMV mtmv, Long partitionId) throws AnalysisException {
List<String> res = Lists.newArrayList();
long maxAvailableTime = mtmv.getPartitionOrAnalysisException(partitionId).getVisibleVersionTimeIgnoreInit();
for (BaseTableInfo baseTableInfo : mtmv.getRelation().getBaseTables()) {
TableIf table = getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
@ -213,14 +206,13 @@ public class MTMVUtil {
if (relatedPartitionId == -1L) {
throw new AnalysisException("can not found related partition");
}
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, item, mtmvRelatedTableIf,
relatedPartitionId, relatedPartitionItems.get(relatedPartitionId));
boolean isSyncWithPartition = isSyncWithPartition(mtmv, partitionId, mtmvRelatedTableIf,
relatedPartitionId);
if (!isSyncWithPartition) {
res.add(mtmvRelatedTableIf.getName());
}
} else {
long tableLastVisibleVersionTime = mtmvRelatedTableIf.getLastModifyTime();
if (tableLastVisibleVersionTime > maxAvailableTime) {
if (!isSyncWithBaseTable(mtmv, partitionId, baseTableInfo)) {
res.add(table.getName());
}
}
@ -257,16 +249,16 @@ public class MTMVUtil {
return res;
}
// check gracePeriod
Long gracePeriod = mtmv.getGracePeriod();
// do not care data is delayed
if (gracePeriod < 0) {
return allPartitions;
}
long gracePeriodMills = mtmv.getGracePeriod();
long currentTimeMills = System.currentTimeMillis();
for (Partition partition : allPartitions) {
if (gracePeriodMills > 0 && currentTimeMills <= (partition.getVisibleVersionTime()
+ gracePeriodMills)) {
res.add(partition);
continue;
}
try {
if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet(),
gracePeriod)) {
if (isMTMVPartitionSync(mtmv, partition.getId(), mtmvRelation.getBaseTables(), Sets.newHashSet())) {
res.add(partition);
}
} catch (AnalysisException e) {
@ -290,8 +282,7 @@ public class MTMVUtil {
for (Partition partition : allPartitions) {
try {
if (!isMTMVPartitionSync(mtmv, partition.getId(), baseTables,
mtmv.getExcludedTriggerTables(),
0L)) {
mtmv.getExcludedTriggerTables())) {
res.add(partition.getId());
}
} catch (AnalysisException e) {
@ -312,11 +303,15 @@ public class MTMVUtil {
* @return
* @throws AnalysisException
*/
private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId, PartitionItem mtmvPartitionItem,
private static boolean isSyncWithPartition(MTMV mtmv, Long mtmvPartitionId,
MTMVRelatedTableIf relatedTable,
Long relatedPartitionId, PartitionItem relatedPartitionItem) throws AnalysisException {
return mtmv.getPartitionLastModifyTime(mtmvPartitionId, mtmvPartitionItem) >= relatedTable
.getPartitionLastModifyTime(relatedPartitionId, relatedPartitionItem);
Long relatedPartitionId) throws AnalysisException {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
String relatedPartitionName = relatedTable.getPartitionName(relatedPartitionId);
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
return mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName, relatedPartitionCurrentSnapshot);
}
/**
@ -343,9 +338,17 @@ public class MTMVUtil {
* @param partitionId
*/
private static void dropPartition(MTMV mtmv, Long partitionId) throws AnalysisException, DdlException {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionId);
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false);
Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause);
if (!mtmv.writeLockIfExist()) {
return;
}
try {
Partition partition = mtmv.getPartitionOrAnalysisException(partitionId);
DropPartitionClause dropPartitionClause = new DropPartitionClause(false, partition.getName(), false, false);
Env.getCurrentEnv().dropPartition((Database) mtmv.getDatabase(), mtmv, dropPartitionClause);
} finally {
mtmv.writeUnlock();
}
}
/**
@ -388,15 +391,13 @@ public class MTMVUtil {
/**
* Determine is sync, ignoring excludedTriggerTables and non OlapTanle
*
* @param visibleVersionTime
* @param mtmvPartitionId
* @param tables
* @param excludedTriggerTables
* @param gracePeriod
* @return
*/
private static boolean isFresherThanTables(long visibleVersionTime, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables, Long gracePeriod) throws AnalysisException {
long maxAvailableTime = visibleVersionTime + gracePeriod;
private static boolean isSyncWithAllBaseTables(MTMV mtmv, long mtmvPartitionId, Set<BaseTableInfo> tables,
Set<String> excludedTriggerTables) throws AnalysisException {
for (BaseTableInfo baseTableInfo : tables) {
TableIf table = null;
try {
@ -408,17 +409,36 @@ public class MTMVUtil {
if (excludedTriggerTables.contains(table.getName())) {
continue;
}
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
long tableLastVisibleVersionTime = ((MTMVRelatedTableIf) table).getLastModifyTime();
if (tableLastVisibleVersionTime > maxAvailableTime) {
boolean syncWithBaseTable = isSyncWithBaseTable(mtmv, mtmvPartitionId, baseTableInfo);
if (!syncWithBaseTable) {
return false;
}
}
return true;
}
private static boolean isSyncWithBaseTable(MTMV mtmv, long mtmvPartitionId, BaseTableInfo baseTableInfo)
throws AnalysisException {
TableIf table = null;
try {
table = getTable(baseTableInfo);
} catch (AnalysisException e) {
LOG.warn("get table failed, {}", baseTableInfo, e);
return false;
}
if (!(table instanceof MTMVRelatedTableIf)) {
// if not MTMVRelatedTableIf, we can not get snapshot from it,
// Currently, it is believed to be synchronous
return true;
}
MTMVRelatedTableIf baseTable = (MTMVRelatedTableIf) table;
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot();
String mtmvPartitionName = mtmv.getPartitionName(mtmvPartitionId);
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, baseTable.getId(), baseTableCurrentSnapshot);
}
private static boolean mtmvContainsExternalTable(MTMV mtmv) {
Set<BaseTableInfo> baseTables = mtmv.getRelation().getBaseTables();
for (BaseTableInfo baseTableInfo : baseTables) {
@ -428,4 +448,60 @@ public class MTMVUtil {
}
return false;
}
public static Map<String, MTMVRefreshPartitionSnapshot> generatePartitionSnapshots(MTMV mtmv,
Set<BaseTableInfo> baseTables, Set<Long> partitionIds)
throws AnalysisException {
Map<String, MTMVRefreshPartitionSnapshot> res = Maps.newHashMap();
for (Long partitionId : partitionIds) {
res.put(mtmv.getPartition(partitionId).getName(), generatePartitionSnapshot(mtmv, baseTables, partitionId));
}
return res;
}
private static MTMVRefreshPartitionSnapshot generatePartitionSnapshot(MTMV mtmv,
Set<BaseTableInfo> baseTables, Long partitionId)
throws AnalysisException {
MTMVRefreshPartitionSnapshot refreshPartitionSnapshot = new MTMVRefreshPartitionSnapshot();
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
List<Long> relatedPartitionIds = getMTMVPartitionRelatedPartitions(
mtmv.getPartitionItems().get(partitionId),
relatedTable);
for (Long relatedPartitionId : relatedPartitionIds) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionId);
refreshPartitionSnapshot.getPartitions()
.put(relatedTable.getPartitionName(relatedPartitionId), partitionSnapshot);
}
}
for (BaseTableInfo baseTableInfo : baseTables) {
if (mtmv.getMvPartitionInfo().getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE && mtmv
.getMvPartitionInfo().getRelatedTableInfo().equals(baseTableInfo)) {
continue;
}
TableIf table = getTable(baseTableInfo);
if (!(table instanceof MTMVRelatedTableIf)) {
continue;
}
refreshPartitionSnapshot.getTables().put(table.getId(), ((MTMVRelatedTableIf) table).getTableSnapshot());
}
return refreshPartitionSnapshot;
}
private static List<Long> getMTMVPartitionRelatedPartitions(PartitionItem mtmvPartitionItem,
MTMVRelatedTableIf relatedTable) {
List<Long> res = Lists.newArrayList();
Map<Long, PartitionItem> relatedPartitionItems = relatedTable.getPartitionItems();
for (Entry<Long, PartitionItem> entry : relatedPartitionItems.entrySet()) {
if (mtmvPartitionItem.equals(entry.getValue())) {
res.add(entry.getKey());
// current, the partitioning of MTMV corresponds one-to-one with the partitioning of related table
return res;
}
}
return res;
}
}

View File

@ -0,0 +1,47 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv;
import com.google.common.base.Objects;
import com.google.gson.annotations.SerializedName;
public class MTMVVersionSnapshot implements MTMVSnapshotIf {
@SerializedName("v")
private long version;
public MTMVVersionSnapshot(long version) {
this.version = version;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MTMVVersionSnapshot that = (MTMVVersionSnapshot) o;
return version == that.version;
}
@Override
public int hashCode() {
return Objects.hashCode(version);
}
}

View File

@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.mtmv.MTMVAlterOpType;
import org.apache.doris.mtmv.MTMVRefreshInfo;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@ -52,6 +53,8 @@ public class AlterMTMV implements Writable {
private MTMVTask task;
@SerializedName("r")
private MTMVRelation relation;
@SerializedName("ps")
private Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots;
public AlterMTMV(TableNameInfo mvName, MTMVRefreshInfo refreshInfo, MTMVAlterOpType opType) {
this.mvName = Objects.requireNonNull(mvName, "require mvName object");
@ -125,6 +128,15 @@ public class AlterMTMV implements Writable {
this.relation = relation;
}
public Map<String, MTMVRefreshPartitionSnapshot> getPartitionSnapshots() {
return partitionSnapshots;
}
public void setPartitionSnapshots(
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
this.partitionSnapshots = partitionSnapshots;
}
@Override
public String toString() {
return "AlterMTMV{"

View File

@ -92,6 +92,10 @@ import org.apache.doris.load.routineload.AbstractDataSourceProperties;
import org.apache.doris.load.routineload.kafka.KafkaDataSourceProperties;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.canal.CanalSyncJob;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.mtmv.MTMVVersionSnapshot;
import org.apache.doris.policy.Policy;
import org.apache.doris.policy.RowPolicy;
import org.apache.doris.policy.StoragePolicy;
@ -241,6 +245,12 @@ public class GsonUtils {
.registerSubtype(InsertJob.class, InsertJob.class.getSimpleName())
.registerSubtype(MTMVJob.class, MTMVJob.class.getSimpleName());
private static RuntimeTypeAdapterFactory<MTMVSnapshotIf> mtmvSnapshotTypeAdapterFactory =
RuntimeTypeAdapterFactory.of(MTMVSnapshotIf.class, "clazz")
.registerSubtype(MTMVMaxTimestampSnapshot.class, MTMVMaxTimestampSnapshot.class.getSimpleName())
.registerSubtype(MTMVTimestampSnapshot.class, MTMVTimestampSnapshot.class.getSimpleName())
.registerSubtype(MTMVVersionSnapshot.class, MTMVVersionSnapshot.class.getSimpleName());
private static RuntimeTypeAdapterFactory<DatabaseIf> dbTypeAdapterFactory = RuntimeTypeAdapterFactory.of(
DatabaseIf.class, "clazz")
.registerSubtype(ExternalDatabase.class, ExternalDatabase.class.getSimpleName())
@ -299,6 +309,7 @@ public class GsonUtils {
.registerTypeAdapterFactory(hbResponseTypeAdapterFactory)
.registerTypeAdapterFactory(rdsTypeAdapterFactory)
.registerTypeAdapterFactory(jobExecutorRuntimeTypeAdapterFactory)
.registerTypeAdapterFactory(mtmvSnapshotTypeAdapterFactory)
.registerTypeAdapterFactory(constraintTypeAdapterFactory)
.registerTypeAdapter(ImmutableMap.class, new ImmutableMapDeserializer())
.registerTypeAdapter(AtomicBoolean.class, new AtomicBooleanAdapter())