branch-2.1: [enhance](mtmv)MTMV interface optimization (#43329)
Cherry-picked from #43086 Co-authored-by: zhangdong <493738387@qq.com>
This commit is contained in:
committed by
GitHub
parent
80fd76677e
commit
ea67e3a6b4
@ -3075,11 +3075,6 @@ public class OlapTable extends Table implements MTMVRelatedTableIf {
|
||||
return new MTMVVersionSnapshot(visibleVersion, id);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needAutoRefresh() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPartitionColumnAllowNull() {
|
||||
return true;
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.datasource.hive;
|
||||
import org.apache.doris.catalog.Column;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.ListPartitionItem;
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.catalog.PartitionItem;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.PrimitiveType;
|
||||
@ -27,10 +28,12 @@ import org.apache.doris.catalog.ScalarType;
|
||||
import org.apache.doris.catalog.Type;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.datasource.ExternalTable;
|
||||
import org.apache.doris.datasource.SchemaCacheValue;
|
||||
import org.apache.doris.datasource.hudi.HudiUtils;
|
||||
import org.apache.doris.datasource.iceberg.IcebergUtils;
|
||||
import org.apache.doris.mtmv.MTMVBaseTableIf;
|
||||
import org.apache.doris.mtmv.MTMVMaxTimestampSnapshot;
|
||||
import org.apache.doris.mtmv.MTMVRefreshContext;
|
||||
import org.apache.doris.mtmv.MTMVRelatedTableIf;
|
||||
@ -87,7 +90,7 @@ import java.util.stream.Collectors;
|
||||
/**
|
||||
* Hive metastore external table.
|
||||
*/
|
||||
public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf {
|
||||
public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableIf, MTMVBaseTableIf {
|
||||
private static final Logger LOG = LogManager.getLogger(HMSExternalTable.class);
|
||||
|
||||
public static final Set<String> SUPPORTED_HIVE_FILE_FORMATS;
|
||||
@ -587,9 +590,9 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
case ICEBERG:
|
||||
if (GlobalVariable.enableFetchIcebergStats) {
|
||||
return StatisticsUtil.getIcebergColumnStats(colName,
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
|
||||
catalog, dbName, name
|
||||
));
|
||||
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache().getIcebergTable(
|
||||
catalog, dbName, name
|
||||
));
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
@ -821,11 +824,6 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
return partition;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean needAutoRefresh() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPartitionColumnAllowNull() {
|
||||
return true;
|
||||
@ -960,4 +958,10 @@ public class HMSExternalTable extends ExternalTable implements MTMVRelatedTableI
|
||||
makeSureInitialized();
|
||||
return !isView() && remoteTable.getPartitionKeysSize() > 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void beforeMTMVRefresh(MTMV mtmv) throws DdlException {
|
||||
Env.getCurrentEnv().getRefreshManager()
|
||||
.refreshTable(getCatalog().getName(), getDbName(), getName(), true);
|
||||
}
|
||||
}
|
||||
|
||||
@ -28,11 +28,11 @@ import org.apache.doris.common.FeConstants;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.hive.HMSExternalTable;
|
||||
import org.apache.doris.job.common.TaskStatus;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.task.AbstractTask;
|
||||
import org.apache.doris.mtmv.BaseTableInfo;
|
||||
import org.apache.doris.mtmv.MTMVBaseTableIf;
|
||||
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
|
||||
import org.apache.doris.mtmv.MTMVPartitionUtil;
|
||||
import org.apache.doris.mtmv.MTMVPlanUtil;
|
||||
@ -171,9 +171,7 @@ public class MTMVTask extends AbstractTask {
|
||||
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
|
||||
// such as deleting a table and creating a view with the same name
|
||||
this.relation = MTMVPlanUtil.generateMTMVRelation(mtmv, ctx);
|
||||
// Now, the MTMV first ensures consistency with the data in the cache.
|
||||
// To be completely consistent with hive, you need to manually refresh the cache
|
||||
refreshHmsTable();
|
||||
beforeMTMVRefresh();
|
||||
if (mtmv.getMvPartitionInfo().getPartitionType() != MTMVPartitionType.SELF_MANAGE) {
|
||||
MTMVPartitionUtil.alignMvPartition(mtmv);
|
||||
}
|
||||
@ -278,20 +276,18 @@ public class MTMVTask extends AbstractTask {
|
||||
}
|
||||
|
||||
/**
|
||||
* Before obtaining information from hmsTable, refresh to ensure that the data is up-to-date
|
||||
* Do something before refreshing, such as clearing the cache of the external table
|
||||
*
|
||||
* @throws AnalysisException
|
||||
* @throws DdlException
|
||||
*/
|
||||
private void refreshHmsTable() throws AnalysisException, DdlException {
|
||||
private void beforeMTMVRefresh() throws AnalysisException, DdlException {
|
||||
for (BaseTableInfo tableInfo : relation.getBaseTablesOneLevel()) {
|
||||
TableIf tableIf = MTMVUtil.getTable(tableInfo);
|
||||
if (tableIf instanceof HMSExternalTable) {
|
||||
HMSExternalTable hmsTable = (HMSExternalTable) tableIf;
|
||||
Env.getCurrentEnv().getRefreshManager()
|
||||
.refreshTable(hmsTable.getCatalog().getName(), hmsTable.getDbName(), hmsTable.getName(), true);
|
||||
if (tableIf instanceof MTMVBaseTableIf) {
|
||||
MTMVBaseTableIf baseTableIf = (MTMVBaseTableIf) tableIf;
|
||||
baseTableIf.beforeMTMVRefresh(mtmv);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -0,0 +1,36 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.mtmv;
|
||||
|
||||
import org.apache.doris.catalog.MTMV;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
/**
|
||||
* The base table of the materialized view should implement this interface and do some things when necessary,
|
||||
* but it is currently not mandatory
|
||||
*/
|
||||
public interface MTMVBaseTableIf {
|
||||
|
||||
/**
|
||||
* Do something before refreshing the MTMV
|
||||
*
|
||||
* @param mtmv
|
||||
* @throws DdlException
|
||||
*/
|
||||
void beforeMTMVRefresh(MTMV mtmv) throws DdlException;
|
||||
}
|
||||
@ -35,8 +35,10 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
|
||||
/**
|
||||
* Get all partitions of the table
|
||||
* Note: This method is called every time there is a refresh and transparent rewrite,
|
||||
* so if this method is slow, it will significantly reduce query performance
|
||||
*
|
||||
* @return partitionId->PartitionItem
|
||||
* @return partitionName->PartitionItem
|
||||
*/
|
||||
Map<String, PartitionItem> getAndCopyPartitionItems();
|
||||
|
||||
@ -64,8 +66,12 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
|
||||
/**
|
||||
* getPartitionSnapshot
|
||||
* It is best to use the version. If there is no version, use the last update time
|
||||
* If snapshots have already been obtained in bulk in the context,
|
||||
* the results should be obtained directly from the context
|
||||
*
|
||||
* @param partitionName
|
||||
* @param context
|
||||
* @return partition snapshot at current time
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
@ -73,7 +79,11 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
|
||||
/**
|
||||
* getTableSnapshot
|
||||
* It is best to use the version. If there is no version, use the last update time
|
||||
* If snapshots have already been obtained in bulk in the context,
|
||||
* the results should be obtained directly from the context
|
||||
*
|
||||
* @param context
|
||||
* @return table snapshot at current time
|
||||
* @throws AnalysisException
|
||||
*/
|
||||
@ -85,7 +95,9 @@ public interface MTMVRelatedTableIf extends TableIf {
|
||||
* @return If return false,The method of comparing whether to synchronize will directly return true,
|
||||
* otherwise the snapshot information will be compared
|
||||
*/
|
||||
boolean needAutoRefresh();
|
||||
default boolean needAutoRefresh() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* if allow partition column `isAllowNull`
|
||||
|
||||
Reference in New Issue
Block a user