zhangdong
2024-11-29 20:53:01 +08:00
committed by GitHub
parent c3707dbdee
commit e03517e4e5
27 changed files with 384 additions and 106 deletions

View File

@ -47,6 +47,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
@ -983,6 +984,10 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException {
return getPartitionColumnNames();
}
public Set<String> getPartitionColumnNames() throws DdlException {
Set<String> partitionColumnNames = Sets.newHashSet();
if (partitionInfo instanceof SinglePartitionInfo) {
@ -3001,11 +3006,20 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return getPartitionType();
}
public PartitionType getPartitionType() {
return partitionInfo.getType();
}
@Override
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot)
throws AnalysisException {
return getAndCopyPartitionItems();
}
public Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException {
if (!tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException(
@ -3026,12 +3040,17 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}
public List<Column> getPartitionColumns() {
return getPartitionInfo().getPartitionColumns();
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot)
throws AnalysisException {
Map<String, Long> partitionVersions = context.getBaseVersions().getPartitionVersions();
long partitionId = getPartitionOrAnalysisException(partitionName).getId();
@ -3041,7 +3060,7 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
}
@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot) {
Map<Long, Long> tableVersions = context.getBaseVersions().getTableVersions();
long visibleVersion = tableVersions.containsKey(id) ? tableVersions.get(id) : getVisibleVersion();
return new MTMVVersionSnapshot(visibleVersion, id);

View File

@ -20,6 +20,7 @@ package org.apache.doris.datasource;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.TableAttributes;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.constraint.Constraint;
@ -28,6 +29,8 @@ import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.AnalysisInfo;
@ -40,6 +43,7 @@ import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -47,6 +51,7 @@ import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -371,4 +376,52 @@ public class ExternalTable implements TableIf, Writable, GsonPostProcessable {
ExternalSchemaCache cache = Env.getCurrentEnv().getExtMetaCacheMgr().getSchemaCache(catalog);
return cache.getSchemaValue(dbName, name);
}
/**
* Retrieve all partitions and initialize SelectedPartitions
*
* @param snapshot if not support mvcc, ignore this
* @return
*/
public SelectedPartitions initSelectedPartitions(Optional<MvccSnapshot> snapshot) {
if (!supportPartitionPruned()) {
return SelectedPartitions.NOT_PRUNED;
}
if (CollectionUtils.isEmpty(this.getPartitionColumns(snapshot))) {
return SelectedPartitions.NOT_PRUNED;
}
Map<String, PartitionItem> nameToPartitionItems = getNameToPartitionItems(snapshot);
return new SelectedPartitions(nameToPartitionItems.size(), nameToPartitionItems, false);
}
/**
* get partition map
* If partition related operations are supported, this method needs to be implemented in the subclass
*
* @param snapshot if not support mvcc, ignore this
* @return partitionName ==> PartitionItem
*/
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return Collections.emptyMap();
}
/**
* get partition column list
* If partition related operations are supported, this method needs to be implemented in the subclass
*
* @param snapshot if not support mvcc, ignore this
* @return
*/
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return Collections.emptyList();
}
/**
* Does it support partition cpruned, If so, this method needs to be overridden in subclasses
*
* @return
*/
public boolean supportPartitionPruned() {
return false;
}
}

View File

@ -33,6 +33,7 @@ import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.datasource.hudi.HudiUtils;
import org.apache.doris.datasource.iceberg.IcebergUtils;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.mtmv.MTMVBaseTableIf;
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
import org.apache.doris.mtmv.MTMVRefreshContext;
@ -40,7 +41,6 @@ import org.apache.doris.mtmv.MTMVRelatedTableIf;
import org.apache.doris.mtmv.MTMVSnapshotIf;
import org.apache.doris.mtmv.MTMVTimestampSnapshot;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
@ -288,7 +288,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
.orElse(Collections.emptyList());
}
@Override
public List<Column> getPartitionColumns() {
makeSureInitialized();
Optional<SchemaCacheValue> schemaCacheValue = getSchemaCacheValue();
@ -296,19 +295,38 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
.orElse(Collections.emptyList());
}
public SelectedPartitions getAllPartitions() {
if (CollectionUtils.isEmpty(this.getPartitionColumns())) {
return SelectedPartitions.NOT_PRUNED;
}
@Override
public List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot) {
return getPartitionColumns();
}
@Override
public boolean supportPartitionPruned() {
return getDlaType() == DLAType.HIVE;
}
@Override
public Map<String, PartitionItem> getNameToPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}
public Map<String, PartitionItem> getNameToPartitionItems() {
if (CollectionUtils.isEmpty(this.getPartitionColumns())) {
return Collections.emptyMap();
}
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) this.getCatalog());
List<Type> partitionColumnTypes = this.getPartitionColumnTypes();
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
this.getDbName(), this.getName(), partitionColumnTypes);
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
return new SelectedPartitions(idToPartitionItem.size(), idToPartitionItem, false);
// transfer id to name
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
Map<String, PartitionItem> nameToPartitionItem = Maps.newHashMapWithExpectedSize(idToPartitionItem.size());
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
nameToPartitionItem.put(idToName.get(entry.getKey()), entry.getValue());
}
return nameToPartitionItem;
}
public boolean isHiveTransactionalTable() {
@ -739,34 +757,33 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public PartitionType getPartitionType(Optional<MvccSnapshot> snapshot) {
return getPartitionType();
}
public PartitionType getPartitionType() {
return getPartitionColumns().size() > 0 ? PartitionType.LIST : PartitionType.UNPARTITIONED;
}
@Override
public Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) {
return getPartitionColumnNames();
}
public Set<String> getPartitionColumnNames() {
return getPartitionColumns().stream()
.map(c -> c.getName().toLowerCase()).collect(Collectors.toSet());
}
@Override
public Map<String, PartitionItem> getAndCopyPartitionItems() {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
getDbName(), getName(), getPartitionColumnTypes());
Map<String, PartitionItem> res = Maps.newHashMap();
Map<Long, PartitionItem> idToPartitionItem = hivePartitionValues.getIdToPartitionItem();
BiMap<Long, String> idToName = hivePartitionValues.getPartitionNameToIdMap().inverse();
for (Entry<Long, PartitionItem> entry : idToPartitionItem.entrySet()) {
res.put(idToName.get(entry.getKey()), entry.getValue());
}
return res;
public Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) {
return getNameToPartitionItems();
}
@Override
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context)
throws AnalysisException {
public MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException {
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
.getMetaStoreCache((HMSExternalCatalog) getCatalog());
HiveMetaStoreCache.HivePartitionValues hivePartitionValues = cache.getPartitionValues(
@ -778,7 +795,8 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
}
@Override
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException {
public MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException {
if (getPartitionType() == PartitionType.UNPARTITIONED) {
return new MTMVMaxTimestampSnapshot(getName(), getLastDdlTime());
}

View File

@ -0,0 +1,25 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.mvcc;
/**
* The snapshot information of mvcc is defined by each table,
* but it should be ensured that the table information queried through this snapshot remains unchanged
*/
public interface MvccSnapshot {
}

View File

@ -0,0 +1,33 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.mvcc;
import org.apache.doris.catalog.TableIf;
/**
* The table that needs to query data based on the version needs to implement this interface.
*/
public interface MvccTable extends TableIf {
/**
* Retrieve the current snapshot information of the table,
* and the returned result will be used for the entire process of this query
*
* @return MvccSnapshot
*/
MvccSnapshot loadSnapshot();
}

View File

@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.mvcc;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import com.google.common.base.Objects;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MvccTableInfo {
private static final Logger LOG = LogManager.getLogger(MvccTableInfo.class);
private String tableName;
private String dbName;
private String ctlName;
public MvccTableInfo(TableIf table) {
java.util.Objects.requireNonNull(table, "table is null");
DatabaseIf database = table.getDatabase();
java.util.Objects.requireNonNull(database, "database is null");
CatalogIf catalog = database.getCatalog();
java.util.Objects.requireNonNull(database, "catalog is null");
this.tableName = table.getName();
this.dbName = database.getFullName();
this.ctlName = catalog.getName();
}
public String getTableName() {
return tableName;
}
public String getDbName() {
return dbName;
}
public String getCtlName() {
return ctlName;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
MvccTableInfo that = (MvccTableInfo) o;
return Objects.equal(tableName, that.tableName) && Objects.equal(
dbName, that.dbName) && Objects.equal(ctlName, that.ctlName);
}
@Override
public int hashCode() {
return Objects.hashCode(tableName, dbName, ctlName);
}
@Override
public String toString() {
return "MvccTableInfo{"
+ "tableName='" + tableName + '\''
+ ", dbName='" + dbName + '\''
+ ", ctlName='" + ctlName + '\''
+ '}';
}
}

View File

@ -69,7 +69,7 @@ public class MTMVPartitionExprDateTrunc implements MTMVPartitionExprService {
String.format("timeUnit not support: %s, only support: %s", this.timeUnit, timeUnits));
}
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
PartitionType partitionType = relatedTable.getPartitionType();
PartitionType partitionType = relatedTable.getPartitionType(Optional.empty());
if (partitionType == PartitionType.RANGE) {
Type partitionColumnType = MTMVPartitionUtil
.getPartitionColumnType(mvPartitionInfo.getRelatedTable(), mvPartitionInfo.getRelatedCol());

View File

@ -25,6 +25,7 @@ import org.apache.doris.datasource.CatalogMgr;
import com.google.gson.annotations.SerializedName;
import java.util.List;
import java.util.Optional;
/**
* MTMVPartitionInfo
@ -115,7 +116,7 @@ public class MTMVPartitionInfo {
if (partitionType == MTMVPartitionType.SELF_MANAGE) {
throw new AnalysisException("partitionType is: " + partitionType);
}
List<Column> partitionColumns = getRelatedTable().getPartitionColumns();
List<Column> partitionColumns = getRelatedTable().getPartitionColumns(Optional.empty());
for (int i = 0; i < partitionColumns.size(); i++) {
if (partitionColumns.get(i).getName().equalsIgnoreCase(relatedCol)) {
return i;

View File

@ -50,6 +50,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@ -329,7 +330,7 @@ public class MTMVPartitionUtil {
}
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf relatedPartitionCurrentSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName, context);
.getPartitionSnapshot(relatedPartitionName, context, Optional.empty());
if (!mtmv.getRefreshSnapshot()
.equalsWithRelatedPartition(mtmvPartitionName, relatedPartitionName,
relatedPartitionCurrentSnapshot)) {
@ -445,7 +446,7 @@ public class MTMVPartitionUtil {
if (!baseTable.needAutoRefresh()) {
return true;
}
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context);
MTMVSnapshotIf baseTableCurrentSnapshot = baseTable.getTableSnapshot(context, Optional.empty());
return mtmv.getRefreshSnapshot()
.equalsWithBaseTable(mtmvPartitionName, new BaseTableInfo(baseTable), baseTableCurrentSnapshot);
}
@ -481,7 +482,7 @@ public class MTMVPartitionUtil {
MTMVRelatedTableIf relatedTable = mtmv.getMvPartitionInfo().getRelatedTable();
for (String relatedPartitionName : relatedPartitionNames) {
MTMVSnapshotIf partitionSnapshot = relatedTable
.getPartitionSnapshot(relatedPartitionName, context);
.getPartitionSnapshot(relatedPartitionName, context, Optional.empty());
refreshPartitionSnapshot.getPartitions()
.put(relatedPartitionName, partitionSnapshot);
}
@ -496,13 +497,13 @@ public class MTMVPartitionUtil {
continue;
}
refreshPartitionSnapshot.addTableSnapshot(baseTableInfo,
((MTMVRelatedTableIf) table).getTableSnapshot(context));
((MTMVRelatedTableIf) table).getTableSnapshot(context, Optional.empty()));
}
return refreshPartitionSnapshot;
}
public static Type getPartitionColumnType(MTMVRelatedTableIf relatedTable, String col) throws AnalysisException {
List<Column> partitionColumns = relatedTable.getPartitionColumns();
List<Column> partitionColumns = relatedTable.getPartitionColumns(Optional.empty());
for (Column column : partitionColumns) {
if (column.getName().equals(col)) {
return column.getType();

View File

@ -20,6 +20,7 @@ package org.apache.doris.mtmv;
import org.apache.doris.common.AnalysisException;
import java.util.Map;
import java.util.Optional;
/**
* get all related partition descs
@ -29,6 +30,6 @@ public class MTMVRelatedPartitionDescInitGenerator implements MTMVRelatedPartiti
@Override
public void apply(MTMVPartitionInfo mvPartitionInfo, Map<String, String> mvProperties,
RelatedPartitionDescResult lastResult) throws AnalysisException {
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems());
lastResult.setItems(mvPartitionInfo.getRelatedTable().getAndCopyPartitionItems(Optional.empty()));
}
}

View File

@ -31,6 +31,7 @@ import com.google.common.collect.Sets;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
/**
@ -45,7 +46,7 @@ public class MTMVRelatedPartitionDescRollUpGenerator implements MTMVRelatedParti
return;
}
MTMVRelatedTableIf relatedTable = mvPartitionInfo.getRelatedTable();
PartitionType partitionType = relatedTable.getPartitionType();
PartitionType partitionType = relatedTable.getPartitionType(Optional.empty());
if (partitionType == PartitionType.RANGE) {
lastResult.setDescs(rollUpRange(lastResult.getDescs(), mvPartitionInfo));
} else if (partitionType == PartitionType.LIST) {

View File

@ -23,9 +23,11 @@ import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
@ -38,31 +40,35 @@ public interface MTMVRelatedTableIf extends TableIf {
* Note: This method is called every time there is a refresh and transparent rewrite,
* so if this method is slow, it will significantly reduce query performance
*
* @param snapshot
* @return partitionName->PartitionItem
*/
Map<String, PartitionItem> getAndCopyPartitionItems() throws AnalysisException;
Map<String, PartitionItem> getAndCopyPartitionItems(Optional<MvccSnapshot> snapshot) throws AnalysisException;
/**
* getPartitionType LIST/RANGE/UNPARTITIONED
*
* @param snapshot
* @return
*/
PartitionType getPartitionType();
PartitionType getPartitionType(Optional<MvccSnapshot> snapshot);
/**
* getPartitionColumnNames
*
* @param snapshot
* @return
* @throws DdlException
*/
Set<String> getPartitionColumnNames() throws DdlException;
Set<String> getPartitionColumnNames(Optional<MvccSnapshot> snapshot) throws DdlException;
/**
* getPartitionColumns
*
* @param snapshot
* @return
*/
List<Column> getPartitionColumns();
List<Column> getPartitionColumns(Optional<MvccSnapshot> snapshot);
/**
* getPartitionSnapshot
@ -70,12 +76,14 @@ public interface MTMVRelatedTableIf extends TableIf {
* If snapshots have already been obtained in bulk in the context,
* the results should be obtained directly from the context
*
* @param snapshot
* @param partitionName
* @param context
* @return partition snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context) throws AnalysisException;
MTMVSnapshotIf getPartitionSnapshot(String partitionName, MTMVRefreshContext context,
Optional<MvccSnapshot> snapshot) throws AnalysisException;
/**
* getTableSnapshot
@ -83,11 +91,13 @@ public interface MTMVRelatedTableIf extends TableIf {
* If snapshots have already been obtained in bulk in the context,
* the results should be obtained directly from the context
*
* @param snapshot
* @param context
* @return table snapshot at current time
* @throws AnalysisException
*/
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context) throws AnalysisException;
MTMVSnapshotIf getTableSnapshot(MTMVRefreshContext context, Optional<MvccSnapshot> snapshot)
throws AnalysisException;
/**
* Does the current type of table allow timed triggering

View File

@ -72,6 +72,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import org.apache.commons.collections.MapUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -478,6 +479,13 @@ public class CascadesContext implements ScheduleContext {
return tableNames;
}
public Map<Long, TableIf> getOrExtractTables(LogicalPlan logicalPlan) {
if (MapUtils.isEmpty(tables)) {
extractTables(logicalPlan);
}
return tables;
}
private Set<List<String>> extractTableNamesFromHaving(LogicalHaving<?> having) {
Set<SubqueryExpr> subqueryExprs = having.getPredicate()
.collect(SubqueryExpr.class::isInstance);

View File

@ -200,7 +200,7 @@ public class NereidsPlanner extends Planner {
plan = preprocess(plan);
initCascadesContext(plan, requireProperties);
statementContext.loadSnapshots(cascadesContext.getOrExtractTables(plan));
try (Lock lock = new Lock(plan, cascadesContext)) {
Plan resultPlan = planWithoutLock(plan, explainLevel, showPlanProcess, requireProperties);
lockCallback.accept(resultPlan);

View File

@ -24,6 +24,9 @@ import org.apache.doris.common.FormatOptions;
import org.apache.doris.common.Id;
import org.apache.doris.common.IdGenerator;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.mvcc.MvccSnapshot;
import org.apache.doris.datasource.mvcc.MvccTable;
import org.apache.doris.datasource.mvcc.MvccTableInfo;
import org.apache.doris.nereids.hint.Hint;
import org.apache.doris.nereids.memo.Group;
import org.apache.doris.nereids.rules.analysis.ColumnAliasGenerator;
@ -166,6 +169,8 @@ public class StatementContext implements Closeable {
private List<PlannerHook> plannerHooks = new ArrayList<>();
private final Map<MvccTableInfo, MvccSnapshot> snapshots = Maps.newHashMap();
public StatementContext() {
this(ConnectContext.get(), null, 0);
}
@ -500,6 +505,32 @@ public class StatementContext implements Closeable {
this.plannerHooks.add(plannerHook);
}
/**
* Load snapshot information of mvcc
*
* @param tables Tables used in queries
*/
public void loadSnapshots(Map<Long, TableIf> tables) {
if (tables == null) {
return;
}
for (TableIf tableIf : tables.values()) {
if (tableIf instanceof MvccTable) {
snapshots.put(new MvccTableInfo(tableIf), ((MvccTable) tableIf).loadSnapshot());
}
}
}
/**
* Obtain snapshot information of mvcc
*
* @param mvccTable mvccTable
* @return MvccSnapshot
*/
public MvccSnapshot getSnapshot(MvccTable mvccTable) {
return snapshots.get(new MvccTableInfo(mvccTable));
}
private static class CloseableResource implements Closeable {
public final String resourceName;
public final String threadName;

View File

@ -429,7 +429,6 @@ public class BindRelation extends OneAnalysisRuleFactory {
} else {
return new LogicalFileScan(unboundRelation.getRelationId(), (HMSExternalTable) table,
qualifierWithoutTableName,
((HMSExternalTable) table).getAllPartitions(),
unboundRelation.getTableSample(),
unboundRelation.getTableSnapshot());
}

View File

@ -433,13 +433,13 @@ public class MaterializedViewUtils {
return null;
}
MTMVRelatedTableIf relatedTable = (MTMVRelatedTableIf) table;
PartitionType type = relatedTable.getPartitionType();
PartitionType type = relatedTable.getPartitionType(Optional.empty());
if (PartitionType.UNPARTITIONED.equals(type)) {
context.addFailReason(String.format("related base table is not partition table, the table is %s",
table.getName()));
return null;
}
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns());
Set<Column> partitionColumnSet = new HashSet<>(relatedTable.getPartitionColumns(Optional.empty()));
Column mvReferenceColumn = contextPartitionColumn.getColumn().get();
Expr definExpr = mvReferenceColumn.getDefineExpr();
if (definExpr instanceof SlotRef) {

View File

@ -35,16 +35,16 @@ import java.util.Objects;
import java.util.stream.IntStream;
/** OneListPartitionInputs */
public class OneListPartitionEvaluator
extends DefaultExpressionRewriter<Map<Slot, PartitionSlotInput>> implements OnePartitionEvaluator {
private final long partitionId;
public class OneListPartitionEvaluator<K>
extends DefaultExpressionRewriter<Map<Slot, PartitionSlotInput>> implements OnePartitionEvaluator<K> {
private final K partitionIdent;
private final List<Slot> partitionSlots;
private final ListPartitionItem partitionItem;
private final ExpressionRewriteContext expressionRewriteContext;
public OneListPartitionEvaluator(long partitionId, List<Slot> partitionSlots,
public OneListPartitionEvaluator(K partitionIdent, List<Slot> partitionSlots,
ListPartitionItem partitionItem, CascadesContext cascadesContext) {
this.partitionId = partitionId;
this.partitionIdent = partitionIdent;
this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null");
this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null");
this.expressionRewriteContext = new ExpressionRewriteContext(
@ -52,8 +52,8 @@ public class OneListPartitionEvaluator
}
@Override
public long getPartitionId() {
return partitionId;
public K getPartitionIdent() {
return partitionIdent;
}
@Override

View File

@ -25,8 +25,8 @@ import java.util.List;
import java.util.Map;
/** the evaluator of the partition which represent one partition */
public interface OnePartitionEvaluator {
long getPartitionId();
public interface OnePartitionEvaluator<K> {
K getPartitionIdent();
/**
* return a slot to expression mapping to replace the input.

View File

@ -80,10 +80,10 @@ import java.util.function.BiFunction;
*
* you can see the process steps in the comment of PartitionSlotInput.columnRanges
*/
public class OneRangePartitionEvaluator
public class OneRangePartitionEvaluator<K>
extends ExpressionVisitor<EvaluateRangeResult, EvaluateRangeInput>
implements OnePartitionEvaluator {
private final long partitionId;
implements OnePartitionEvaluator<K> {
private final K partitionIdent;
private final List<Slot> partitionSlots;
private final RangePartitionItem partitionItem;
private final ExpressionRewriteContext expressionRewriteContext;
@ -95,9 +95,9 @@ public class OneRangePartitionEvaluator
private final Map<Slot, PartitionSlotType> slotToType;
/** OneRangePartitionEvaluator */
public OneRangePartitionEvaluator(long partitionId, List<Slot> partitionSlots,
public OneRangePartitionEvaluator(K partitionIdent, List<Slot> partitionSlots,
RangePartitionItem partitionItem, CascadesContext cascadesContext, int expandThreshold) {
this.partitionId = partitionId;
this.partitionIdent = partitionIdent;
this.partitionSlots = Objects.requireNonNull(partitionSlots, "partitionSlots cannot be null");
this.partitionItem = Objects.requireNonNull(partitionItem, "partitionItem cannot be null");
this.expressionRewriteContext = new ExpressionRewriteContext(
@ -155,8 +155,8 @@ public class OneRangePartitionEvaluator
}
@Override
public long getPartitionId() {
return partitionId;
public K getPartitionIdent() {
return partitionIdent;
}
@Override

View File

@ -102,21 +102,21 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> {
}
/** prune */
public List<Long> prune() {
Builder<Long> scanPartitionIds = ImmutableList.builder();
public <K> List<K> prune() {
Builder<K> scanPartitionIdents = ImmutableList.builder();
for (OnePartitionEvaluator partition : partitions) {
if (!canBePrunedOut(partition)) {
scanPartitionIds.add(partition.getPartitionId());
scanPartitionIdents.add((K) partition.getPartitionIdent());
}
}
return scanPartitionIds.build();
return scanPartitionIdents.build();
}
/**
* prune partition with `idToPartitions` as parameter.
*/
public static List<Long> prune(List<Slot> partitionSlots, Expression partitionPredicate,
Map<Long, PartitionItem> idToPartitions, CascadesContext cascadesContext,
public static <K> List<K> prune(List<Slot> partitionSlots, Expression partitionPredicate,
Map<K, PartitionItem> idToPartitions, CascadesContext cascadesContext,
PartitionTableType partitionTableType) {
partitionPredicate = PartitionPruneExpressionExtractor.extract(
partitionPredicate, ImmutableSet.copyOf(partitionSlots), cascadesContext);
@ -135,7 +135,7 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> {
}
List<OnePartitionEvaluator> evaluators = Lists.newArrayListWithCapacity(idToPartitions.size());
for (Entry<Long, PartitionItem> kv : idToPartitions.entrySet()) {
for (Entry<K, PartitionItem> kv : idToPartitions.entrySet()) {
evaluators.add(toPartitionEvaluator(
kv.getKey(), kv.getValue(), partitionSlots, cascadesContext, expandThreshold));
}
@ -147,7 +147,7 @@ public class PartitionPruner extends DefaultExpressionRewriter<Void> {
/**
* convert partition item to partition evaluator
*/
public static final OnePartitionEvaluator toPartitionEvaluator(long id, PartitionItem partitionItem,
public static final <K> OnePartitionEvaluator<K> toPartitionEvaluator(K id, PartitionItem partitionItem,
List<Slot> partitionSlots, CascadesContext cascadesContext, int expandThreshold) {
if (partitionItem instanceof ListPartitionItem) {
return new OneListPartitionEvaluator(

View File

@ -28,18 +28,18 @@ import java.util.List;
import java.util.Map;
/** UnknownPartitionEvaluator */
public class UnknownPartitionEvaluator implements OnePartitionEvaluator {
private final long partitionId;
public class UnknownPartitionEvaluator<K> implements OnePartitionEvaluator<K> {
private final K partitionIdent;
private final PartitionItem partitionItem;
public UnknownPartitionEvaluator(long partitionId, PartitionItem partitionItem) {
this.partitionId = partitionId;
public UnknownPartitionEvaluator(K partitionId, PartitionItem partitionItem) {
this.partitionIdent = partitionId;
this.partitionItem = partitionItem;
}
@Override
public long getPartitionId() {
return partitionId;
public K getPartitionIdent() {
return partitionIdent;
}
@Override

View File

@ -19,8 +19,6 @@ package org.apache.doris.nereids.rules.rewrite;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable.DLAType;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
@ -38,6 +36,7 @@ import org.apache.commons.collections.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -60,10 +59,8 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory {
ExternalTable tbl = scan.getTable();
SelectedPartitions selectedPartitions;
// TODO(cmy): support other external table
if (tbl instanceof HMSExternalTable && ((HMSExternalTable) tbl).getDlaType() == DLAType.HIVE) {
HMSExternalTable hiveTbl = (HMSExternalTable) tbl;
selectedPartitions = pruneHivePartitions(hiveTbl, filter, scan, ctx.cascadesContext);
if (tbl.supportPartitionPruned()) {
selectedPartitions = pruneExternalPartitions(tbl, filter, scan, ctx.cascadesContext);
} else {
// set isPruned so that it won't go pass the partition prune again
selectedPartitions = new SelectedPartitions(0, ImmutableMap.of(), true);
@ -74,10 +71,11 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory {
}).toRule(RuleType.FILE_SCAN_PARTITION_PRUNE);
}
private SelectedPartitions pruneHivePartitions(HMSExternalTable hiveTbl,
private SelectedPartitions pruneExternalPartitions(ExternalTable externalTable,
LogicalFilter<LogicalFileScan> filter, LogicalFileScan scan, CascadesContext ctx) {
Map<Long, PartitionItem> selectedPartitionItems = Maps.newHashMap();
if (CollectionUtils.isEmpty(hiveTbl.getPartitionColumns())) {
Map<String, PartitionItem> selectedPartitionItems = Maps.newHashMap();
// todo: real snapshotId
if (CollectionUtils.isEmpty(externalTable.getPartitionColumns(Optional.empty()))) {
// non partitioned table, return NOT_PRUNED.
// non partition table will be handled in HiveScanNode.
return SelectedPartitions.NOT_PRUNED;
@ -85,19 +83,19 @@ public class PruneFileScanPartition extends OneRewriteRuleFactory {
Map<String, Slot> scanOutput = scan.getOutput()
.stream()
.collect(Collectors.toMap(slot -> slot.getName().toLowerCase(), Function.identity()));
List<Slot> partitionSlots = hiveTbl.getPartitionColumns()
// todo: real snapshotId
List<Slot> partitionSlots = externalTable.getPartitionColumns(Optional.empty())
.stream()
.map(column -> scanOutput.get(column.getName().toLowerCase()))
.collect(Collectors.toList());
Map<Long, PartitionItem> idToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
List<Long> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), idToPartitionItem, ctx, PartitionTableType.HIVE));
Map<String, PartitionItem> nameToPartitionItem = scan.getSelectedPartitions().selectedPartitions;
List<String> prunedPartitions = new ArrayList<>(PartitionPruner.prune(
partitionSlots, filter.getPredicate(), nameToPartitionItem, ctx, PartitionTableType.HIVE));
for (Long id : prunedPartitions) {
selectedPartitionItems.put(id, idToPartitionItem.get(id));
for (String name : prunedPartitions) {
selectedPartitionItems.put(name, nameToPartitionItem.get(name));
}
return new SelectedPartitions(idToPartitionItem.size(), selectedPartitionItems, true);
return new SelectedPartitions(nameToPartitionItem.size(), selectedPartitionItems, true);
}
}

View File

@ -351,7 +351,7 @@ public class CreateMTMVInfo {
allPartitionDescs.size(), ctx.getSessionVariable().getCreateTablePartitionMaxNum()));
}
try {
PartitionType type = relatedTable.getPartitionType();
PartitionType type = relatedTable.getPartitionType(Optional.empty());
if (type == PartitionType.RANGE) {
return new RangePartitionDesc(Lists.newArrayList(mvPartitionInfo.getPartitionCol()),
allPartitionDescs);

View File

@ -54,6 +54,7 @@ import org.apache.doris.qe.SessionVariable;
import com.google.common.collect.Sets;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -147,7 +148,7 @@ public class MTMVPartitionDefinition {
MTMVRelatedTableIf mtmvBaseRealtedTable = MTMVUtil.getRelatedTable(relatedTableInfo.getTableInfo());
Set<String> partitionColumnNames = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
try {
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames());
partitionColumnNames.addAll(mtmvBaseRealtedTable.getPartitionColumnNames(Optional.empty()));
} catch (DdlException e) {
throw new AnalysisException(e.getMessage(), e);
}

View File

@ -59,17 +59,11 @@ public class LogicalFileScan extends LogicalCatalogRelation {
this.tableSnapshot = tableSnapshot;
}
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
SelectedPartitions selectedPartitions,
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
this(id, table, qualifier, Optional.empty(), Optional.empty(),
selectedPartitions, tableSample, tableSnapshot);
}
public LogicalFileScan(RelationId id, ExternalTable table, List<String> qualifier,
Optional<TableSample> tableSample, Optional<TableSnapshot> tableSnapshot) {
// todo: real snapshotId
this(id, table, qualifier, Optional.empty(), Optional.empty(),
SelectedPartitions.NOT_PRUNED, tableSample, tableSnapshot);
table.initSelectedPartitions(Optional.empty()), tableSample, tableSnapshot);
}
public SelectedPartitions getSelectedPartitions() {
@ -147,9 +141,9 @@ public class LogicalFileScan extends LogicalCatalogRelation {
*/
public final long totalPartitionNum;
/**
* partition id -> partition item
* partition name -> partition item
*/
public final Map<Long, PartitionItem> selectedPartitions;
public final Map<String, PartitionItem> selectedPartitions;
/**
* true means the result is after partition pruning
* false means the partition pruning is not processed.
@ -159,7 +153,7 @@ public class LogicalFileScan extends LogicalCatalogRelation {
/**
* Constructor for SelectedPartitions.
*/
public SelectedPartitions(long totalPartitionNum, Map<Long, PartitionItem> selectedPartitions,
public SelectedPartitions(long totalPartitionNum, Map<String, PartitionItem> selectedPartitions,
boolean isPruned) {
this.totalPartitionNum = totalPartitionNum;
this.selectedPartitions = ImmutableMap.copyOf(Objects.requireNonNull(selectedPartitions,

View File

@ -35,6 +35,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class MTMVPartitionUtilTest {
@ -112,7 +113,7 @@ public class MTMVPartitionUtilTest {
minTimes = 0;
result = true;
baseOlapTable.getTableSnapshot((MTMVRefreshContext) any);
baseOlapTable.getTableSnapshot((MTMVRefreshContext) any, (Optional) any);
minTimes = 0;
result = baseSnapshotIf;
@ -132,7 +133,7 @@ public class MTMVPartitionUtilTest {
minTimes = 0;
result = true;
baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any);
baseOlapTable.getPartitionSnapshot(anyString, (MTMVRefreshContext) any, (Optional) any);
minTimes = 0;
result = baseSnapshotIf;