[enhance](mtmv)Mv refresh on commit (#35702)
pick from master #34548 The modification involving CloudGlobalTransactionMgr was not picked up to 2.1 because the 2.1 branch does not yet have the Thunderbolt CloudGlobalTransactionMgr
This commit is contained in:
@ -141,6 +141,8 @@ import org.apache.doris.deploy.DeployManager;
|
||||
import org.apache.doris.deploy.impl.AmbariDeployManager;
|
||||
import org.apache.doris.deploy.impl.K8sDeployManager;
|
||||
import org.apache.doris.deploy.impl.LocalFileDeployManager;
|
||||
import org.apache.doris.event.EventProcessor;
|
||||
import org.apache.doris.event.ReplacePartitionEvent;
|
||||
import org.apache.doris.ha.BDBHA;
|
||||
import org.apache.doris.ha.FrontendNodeType;
|
||||
import org.apache.doris.ha.HAProtocol;
|
||||
@ -529,6 +531,7 @@ public class Env {
|
||||
private TopicPublisherThread topicPublisherThread;
|
||||
|
||||
private MTMVService mtmvService;
|
||||
private EventProcessor eventProcessor;
|
||||
|
||||
private InsertOverwriteManager insertOverwriteManager;
|
||||
|
||||
@ -772,6 +775,7 @@ public class Env {
|
||||
this.topicPublisherThread = new TopicPublisherThread(
|
||||
"TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo);
|
||||
this.mtmvService = new MTMVService();
|
||||
this.eventProcessor = new EventProcessor(mtmvService);
|
||||
this.insertOverwriteManager = new InsertOverwriteManager();
|
||||
this.dnsCache = new DNSCache();
|
||||
this.sqlCacheManager = new NereidsSqlCacheManager();
|
||||
@ -839,6 +843,10 @@ public class Env {
|
||||
return mtmvService;
|
||||
}
|
||||
|
||||
public EventProcessor getEventProcessor() {
|
||||
return eventProcessor;
|
||||
}
|
||||
|
||||
public InsertOverwriteManager getInsertOverwriteManager() {
|
||||
return insertOverwriteManager;
|
||||
}
|
||||
@ -5547,6 +5555,18 @@ public class Env {
|
||||
long version = olapTable.getNextVersion();
|
||||
long versionTime = System.currentTimeMillis();
|
||||
olapTable.updateVisibleVersionAndTime(version, versionTime);
|
||||
// Here, we only wait for the EventProcessor to finish processing the event,
|
||||
// but regardless of the success or failure of the result,
|
||||
// it does not affect the logic of replace the partition
|
||||
try {
|
||||
Env.getCurrentEnv().getEventProcessor().processEvent(
|
||||
new ReplacePartitionEvent(db.getCatalog().getId(), db.getId(),
|
||||
olapTable.getId()));
|
||||
} catch (Throwable t) {
|
||||
// According to normal logic, no exceptions will be thrown,
|
||||
// but in order to avoid bugs affecting the original logic, all exceptions are caught
|
||||
LOG.warn("produceEvent failed: ", t);
|
||||
}
|
||||
// write log
|
||||
ReplacePartitionOperationLog info =
|
||||
new ReplacePartitionOperationLog(db.getId(), db.getFullName(), olapTable.getId(), olapTable.getName(),
|
||||
|
||||
@ -139,6 +139,7 @@ import org.apache.doris.datasource.es.EsRepository;
|
||||
import org.apache.doris.datasource.hive.HMSCachedClient;
|
||||
import org.apache.doris.datasource.hive.HiveMetadataOps;
|
||||
import org.apache.doris.datasource.property.constants.HMSProperties;
|
||||
import org.apache.doris.event.DropPartitionEvent;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
|
||||
import org.apache.doris.persist.AlterDatabasePropertyInfo;
|
||||
@ -1809,11 +1810,22 @@ public class InternalCatalog implements CatalogIf<Database> {
|
||||
long version = olapTable.getNextVersion();
|
||||
long versionTime = System.currentTimeMillis();
|
||||
olapTable.updateVisibleVersionAndTime(version, versionTime);
|
||||
// Here, we only wait for the EventProcessor to finish processing the event,
|
||||
// but regardless of the success or failure of the result,
|
||||
// it does not affect the logic of deleting the partition
|
||||
try {
|
||||
Env.getCurrentEnv().getEventProcessor().processEvent(
|
||||
new DropPartitionEvent(db.getCatalog().getId(), db.getId(),
|
||||
olapTable.getId()));
|
||||
} catch (Throwable t) {
|
||||
// According to normal logic, no exceptions will be thrown,
|
||||
// but in order to avoid bugs affecting the original logic, all exceptions are caught
|
||||
LOG.warn("produceEvent failed: ", t);
|
||||
}
|
||||
// log
|
||||
DropPartitionInfo info = new DropPartitionInfo(db.getId(), olapTable.getId(), partitionName, isTempPartition,
|
||||
clause.isForceDrop(), recycleTime, version, versionTime);
|
||||
Env.getCurrentEnv().getEditLog().logDropPartition(info);
|
||||
|
||||
LOG.info("succeed in dropping partition[{}], table : [{}-{}], is temp : {}, is force : {}",
|
||||
partitionName, olapTable.getId(), olapTable.getName(), isTempPartition, clause.isForceDrop());
|
||||
}
|
||||
|
||||
@ -0,0 +1,24 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public class DataChangeEvent extends TableEvent {
|
||||
public DataChangeEvent(long ctlId, long dbId, long tableId) {
|
||||
super(EventType.DATA_CHANGE, ctlId, dbId, tableId);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public class DropPartitionEvent extends TableEvent {
|
||||
public DropPartitionEvent(long ctlId, long dbId, long tableId) {
|
||||
super(EventType.DROP_PARTITION, ctlId, dbId, tableId);
|
||||
}
|
||||
}
|
||||
60
fe/fe-core/src/main/java/org/apache/doris/event/Event.java
Normal file
60
fe/fe-core/src/main/java/org/apache/doris/event/Event.java
Normal file
@ -0,0 +1,60 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
import org.apache.doris.catalog.Env;
|
||||
|
||||
import java.util.Objects;
|
||||
|
||||
public abstract class Event {
|
||||
protected final long eventId;
|
||||
|
||||
// eventTime of the event. Used instead of calling getter on event everytime
|
||||
protected final long eventTime;
|
||||
|
||||
// eventType from the NotificationEvent
|
||||
protected final EventType eventType;
|
||||
|
||||
protected Event(EventType eventType) {
|
||||
Objects.requireNonNull(eventType, "require eventType");
|
||||
this.eventId = Env.getCurrentEnv().getNextId();
|
||||
this.eventTime = System.currentTimeMillis();
|
||||
this.eventType = eventType;
|
||||
}
|
||||
|
||||
public long getEventId() {
|
||||
return eventId;
|
||||
}
|
||||
|
||||
public long getEventTime() {
|
||||
return eventTime;
|
||||
}
|
||||
|
||||
public EventType getEventType() {
|
||||
return eventType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Event{"
|
||||
+ "eventId=" + eventId
|
||||
+ ", eventTime=" + eventTime
|
||||
+ ", eventType=" + eventType
|
||||
+ '}';
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,34 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public class EventException extends Exception {
|
||||
|
||||
public EventException(String msg, Throwable cause) {
|
||||
super(msg, cause);
|
||||
}
|
||||
|
||||
public EventException(String msg) {
|
||||
super(msg);
|
||||
}
|
||||
|
||||
public EventException(Exception e) {
|
||||
super(e);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,23 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public interface EventListener {
|
||||
|
||||
void processEvent(Event event) throws EventException;
|
||||
}
|
||||
@ -0,0 +1,57 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class EventProcessor {
|
||||
|
||||
private static final Logger LOG = LogManager.getLogger(EventProcessor.class);
|
||||
|
||||
private Set<EventListener> listeners = Sets.newHashSet();
|
||||
|
||||
public EventProcessor(EventListener... args) {
|
||||
for (EventListener listener : args) {
|
||||
this.listeners.add(listener);
|
||||
}
|
||||
}
|
||||
|
||||
public boolean processEvent(Event event) {
|
||||
Objects.requireNonNull(event);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("processEvent: {}", event);
|
||||
}
|
||||
boolean result = true;
|
||||
for (EventListener listener : listeners) {
|
||||
try {
|
||||
listener.processEvent(event);
|
||||
} catch (EventException e) {
|
||||
// A listener processing failure does not affect other listeners
|
||||
LOG.warn("[{}] process event failed, event: {}, errMsg: {}", listener.getClass().getName(), event,
|
||||
e.getMessage());
|
||||
result = false;
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public enum EventType {
|
||||
DATA_CHANGE,
|
||||
REPLACE_PARTITION,
|
||||
DROP_PARTITION
|
||||
}
|
||||
@ -0,0 +1,24 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public class ReplacePartitionEvent extends TableEvent {
|
||||
public ReplacePartitionEvent(long ctlId, long dbId, long tableId) {
|
||||
super(EventType.REPLACE_PARTITION, ctlId, dbId, tableId);
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package org.apache.doris.event;
|
||||
|
||||
public abstract class TableEvent extends Event {
|
||||
protected final long ctlId;
|
||||
protected final long dbId;
|
||||
protected final long tableId;
|
||||
|
||||
public TableEvent(EventType eventType, long ctlId, long dbId, long tableId) {
|
||||
super(eventType);
|
||||
this.ctlId = ctlId;
|
||||
this.dbId = dbId;
|
||||
this.tableId = tableId;
|
||||
}
|
||||
|
||||
public long getCtlId() {
|
||||
return ctlId;
|
||||
}
|
||||
|
||||
public long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
public long getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "TableEvent{"
|
||||
+ "ctlId=" + ctlId
|
||||
+ ", dbId=" + dbId
|
||||
+ ", tableId=" + tableId
|
||||
+ "} " + super.toString();
|
||||
}
|
||||
}
|
||||
@ -140,27 +140,44 @@ public class MTMVJob extends AbstractJob<MTMVTask, MTMVTaskContext> {
|
||||
|
||||
/**
|
||||
* if user trigger, return true
|
||||
* if system trigger, Check if there are any system triggered tasks, and if so, return false
|
||||
* else, only can have 2 task. because every task can refresh all data.
|
||||
*
|
||||
* @param taskContext
|
||||
* @return
|
||||
*/
|
||||
@Override
|
||||
public boolean isReadyForScheduling(MTMVTaskContext taskContext) {
|
||||
if (taskContext != null) {
|
||||
if (isManual(taskContext)) {
|
||||
return true;
|
||||
}
|
||||
List<MTMVTask> runningTasks = getRunningTasks();
|
||||
int runningNum = 0;
|
||||
for (MTMVTask task : runningTasks) {
|
||||
if (task.getTaskContext() == null || task.getTaskContext().getTriggerMode() == MTMVTaskTriggerMode.SYSTEM) {
|
||||
LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}",
|
||||
task);
|
||||
return false;
|
||||
if (!isManual(task.getTaskContext())) {
|
||||
runningNum++;
|
||||
// Prerequisite: Each refresh will calculate which partitions to refresh
|
||||
//
|
||||
// For example, there is currently a running task that is refreshing partition p1.
|
||||
// If the data of p2 changes at this time and triggers a refresh task t2,
|
||||
// according to the logic (>=1), t2 will be lost
|
||||
//
|
||||
// If the logic is >=2, t2 will wait lock of MTMVJob.
|
||||
// If the p3 data changes again and triggers the refresh task t3,
|
||||
// then t3 will be discarded. However, when t2 runs, both p2 and p3 data will be refreshed.
|
||||
if (runningNum >= 2) {
|
||||
LOG.warn("isReadyForScheduling return false, because current taskContext is null, exist task: {}",
|
||||
task);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean isManual(MTMVTaskContext taskContext) {
|
||||
return taskContext != null && taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ShowResultSetMetaData getJobMetaData() {
|
||||
return JOB_META_DATA;
|
||||
|
||||
@ -107,6 +107,7 @@ public class MTMVTask extends AbstractTask {
|
||||
|
||||
public enum MTMVTaskTriggerMode {
|
||||
MANUAL,
|
||||
COMMIT,
|
||||
SYSTEM
|
||||
}
|
||||
|
||||
|
||||
@ -32,18 +32,24 @@ public class BaseTableInfo {
|
||||
private static final Logger LOG = LogManager.getLogger(BaseTableInfo.class);
|
||||
|
||||
@SerializedName("ti")
|
||||
private Long tableId;
|
||||
private long tableId;
|
||||
@SerializedName("di")
|
||||
private Long dbId;
|
||||
private long dbId;
|
||||
@SerializedName("ci")
|
||||
private Long ctlId;
|
||||
private long ctlId;
|
||||
|
||||
public BaseTableInfo(Long tableId, Long dbId) {
|
||||
public BaseTableInfo(long tableId, long dbId) {
|
||||
this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null");
|
||||
this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null");
|
||||
this.ctlId = InternalCatalog.INTERNAL_CATALOG_ID;
|
||||
}
|
||||
|
||||
public BaseTableInfo(long tableId, long dbId, long ctlId) {
|
||||
this.tableId = java.util.Objects.requireNonNull(tableId, "tableId is null");
|
||||
this.dbId = java.util.Objects.requireNonNull(dbId, "dbId is null");
|
||||
this.ctlId = java.util.Objects.requireNonNull(ctlId, "ctlId is null");
|
||||
}
|
||||
|
||||
public BaseTableInfo(TableIf table) {
|
||||
DatabaseIf database = table.getDatabase();
|
||||
java.util.Objects.requireNonNull(database, "database is null");
|
||||
@ -54,15 +60,15 @@ public class BaseTableInfo {
|
||||
this.ctlId = catalog.getId();
|
||||
}
|
||||
|
||||
public Long getTableId() {
|
||||
public long getTableId() {
|
||||
return tableId;
|
||||
}
|
||||
|
||||
public Long getDbId() {
|
||||
public long getDbId() {
|
||||
return dbId;
|
||||
}
|
||||
|
||||
public Long getCtlId() {
|
||||
public long getCtlId() {
|
||||
return ctlId;
|
||||
}
|
||||
|
||||
|
||||
@ -45,6 +45,7 @@ import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
|
||||
import org.apache.doris.persist.AlterMTMV;
|
||||
import org.apache.doris.qe.ConnectContext;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.commons.collections.CollectionUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
@ -79,11 +80,10 @@ public class MTMVJobManager implements MTMVHookService {
|
||||
|
||||
private JobExecutionConfiguration getJobConfig(MTMV mtmv) {
|
||||
JobExecutionConfiguration jobExecutionConfiguration = new JobExecutionConfiguration();
|
||||
if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
|
||||
.equals(RefreshTrigger.SCHEDULE)) {
|
||||
RefreshTrigger refreshTrigger = mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger();
|
||||
if (refreshTrigger.equals(RefreshTrigger.SCHEDULE)) {
|
||||
setScheduleJobConfig(jobExecutionConfiguration, mtmv);
|
||||
} else if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger()
|
||||
.equals(RefreshTrigger.MANUAL)) {
|
||||
} else if (refreshTrigger.equals(RefreshTrigger.MANUAL) || refreshTrigger.equals(RefreshTrigger.COMMIT)) {
|
||||
setManualJobConfig(jobExecutionConfiguration, mtmv);
|
||||
}
|
||||
return jobExecutionConfiguration;
|
||||
@ -210,9 +210,20 @@ public class MTMVJobManager implements MTMVHookService {
|
||||
job.cancelTaskById(info.getTaskId());
|
||||
}
|
||||
|
||||
public void onCommit(MTMV mtmv) throws DdlException, JobException {
|
||||
MTMVJob job = getJobByMTMV(mtmv);
|
||||
MTMVTaskContext mtmvTaskContext = new MTMVTaskContext(MTMVTaskTriggerMode.COMMIT, Lists.newArrayList(),
|
||||
false);
|
||||
Env.getCurrentEnv().getJobManager().triggerJob(job.getJobId(), mtmvTaskContext);
|
||||
}
|
||||
|
||||
private MTMVJob getJobByTableNameInfo(TableNameInfo info) throws DdlException, MetaNotFoundException {
|
||||
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(info.getDb());
|
||||
MTMV mtmv = (MTMV) db.getTableOrMetaException(info.getTbl(), TableType.MATERIALIZED_VIEW);
|
||||
return getJobByMTMV(mtmv);
|
||||
}
|
||||
|
||||
private MTMVJob getJobByMTMV(MTMV mtmv) throws DdlException {
|
||||
List<MTMVJob> jobs = Env.getCurrentEnv().getJobManager()
|
||||
.queryJobs(JobType.MV, mtmv.getJobInfo().getJobName());
|
||||
if (CollectionUtils.isEmpty(jobs) || jobs.size() != 1) {
|
||||
|
||||
@ -43,6 +43,7 @@ public class MTMVRefreshEnum {
|
||||
*/
|
||||
public enum RefreshTrigger {
|
||||
MANUAL, //manual
|
||||
COMMIT, //manual
|
||||
SCHEDULE // schedule
|
||||
}
|
||||
|
||||
|
||||
@ -22,8 +22,13 @@ import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.MetaNotFoundException;
|
||||
import org.apache.doris.event.Event;
|
||||
import org.apache.doris.event.EventException;
|
||||
import org.apache.doris.event.EventListener;
|
||||
import org.apache.doris.event.TableEvent;
|
||||
import org.apache.doris.job.exception.JobException;
|
||||
import org.apache.doris.job.extensions.mtmv.MTMVTask;
|
||||
import org.apache.doris.mtmv.MTMVRefreshEnum.RefreshTrigger;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.CancelMTMVTaskInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.PauseMTMVInfo;
|
||||
import org.apache.doris.nereids.trees.plans.commands.info.RefreshMTMVInfo;
|
||||
@ -36,8 +41,9 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
public class MTMVService {
|
||||
public class MTMVService implements EventListener {
|
||||
private static final Logger LOG = LogManager.getLogger(MTMVService.class);
|
||||
|
||||
private Map<String, MTMVHookService> hooks = Maps.newConcurrentMap();
|
||||
@ -162,4 +168,27 @@ public class MTMVService {
|
||||
mtmvHookService.cancelMTMVTask(info);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processEvent(Event event) throws EventException {
|
||||
Objects.requireNonNull(event);
|
||||
if (!(event instanceof TableEvent)) {
|
||||
return;
|
||||
}
|
||||
TableEvent tableEvent = (TableEvent) event;
|
||||
LOG.info("processEvent, Event: {}", event);
|
||||
Set<BaseTableInfo> mtmvs = relationManager.getMtmvsByBaseTableOneLevel(
|
||||
new BaseTableInfo(tableEvent.getTableId(), tableEvent.getDbId(), tableEvent.getCtlId()));
|
||||
for (BaseTableInfo baseTableInfo : mtmvs) {
|
||||
try {
|
||||
// check if mtmv should trigger by event
|
||||
MTMV mtmv = MTMVUtil.getMTMV(baseTableInfo.getDbId(), baseTableInfo.getTableId());
|
||||
if (mtmv.getRefreshInfo().getRefreshTriggerInfo().getRefreshTrigger().equals(RefreshTrigger.COMMIT)) {
|
||||
jobManager.onCommit(mtmv);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new EventException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -700,6 +700,9 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
|
||||
if (ctx.MANUAL() != null) {
|
||||
return new MTMVRefreshTriggerInfo(RefreshTrigger.MANUAL);
|
||||
}
|
||||
if (ctx.COMMIT() != null) {
|
||||
return new MTMVRefreshTriggerInfo(RefreshTrigger.COMMIT);
|
||||
}
|
||||
if (ctx.SCHEDULE() != null) {
|
||||
return new MTMVRefreshTriggerInfo(RefreshTrigger.SCHEDULE, visitRefreshSchedule(ctx.refreshSchedule()));
|
||||
}
|
||||
|
||||
@ -49,6 +49,7 @@ import org.apache.doris.common.util.InternalDatabaseUtil;
|
||||
import org.apache.doris.common.util.MetaLockUtils;
|
||||
import org.apache.doris.common.util.TimeUtils;
|
||||
import org.apache.doris.datasource.InternalCatalog;
|
||||
import org.apache.doris.event.DataChangeEvent;
|
||||
import org.apache.doris.metric.MetricRepo;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.persist.BatchRemoveTransactionsOperationV2;
|
||||
@ -1052,6 +1053,17 @@ public class DatabaseTransactionMgr {
|
||||
} finally {
|
||||
MetaLockUtils.writeUnlockTables(tableList);
|
||||
}
|
||||
// Here, we only wait for the EventProcessor to finish processing the event,
|
||||
// but regardless of the success or failure of the result,
|
||||
// it does not affect the logic of transaction
|
||||
try {
|
||||
produceEvent(transactionState, db);
|
||||
} catch (Throwable t) {
|
||||
// According to normal logic, no exceptions will be thrown,
|
||||
// but in order to avoid bugs affecting the original logic, all exceptions are caught
|
||||
LOG.warn("produceEvent failed: ", t);
|
||||
}
|
||||
|
||||
// The visible latch should only be counted down after all things are done
|
||||
// (finish transaction, write edit log, etc).
|
||||
// Otherwise, there is no way for stream load to query the result right after loading finished,
|
||||
@ -1075,6 +1087,26 @@ public class DatabaseTransactionMgr {
|
||||
}
|
||||
}
|
||||
|
||||
private void produceEvent(TransactionState transactionState, Database db) {
|
||||
Collection<TableCommitInfo> tableCommitInfos;
|
||||
if (!transactionState.getSubTxnIdToTableCommitInfo().isEmpty()) {
|
||||
tableCommitInfos = transactionState.getSubTxnTableCommitInfos();
|
||||
} else {
|
||||
tableCommitInfos = transactionState.getIdToTableCommitInfos().values();
|
||||
}
|
||||
for (TableCommitInfo tableCommitInfo : tableCommitInfos) {
|
||||
long tableId = tableCommitInfo.getTableId();
|
||||
OlapTable table = (OlapTable) db.getTableNullable(tableId);
|
||||
if (table == null) {
|
||||
LOG.warn("table {} does not exist when produceEvent. transaction: {}, db: {}",
|
||||
tableId, transactionState.getTransactionId(), db.getId());
|
||||
continue;
|
||||
}
|
||||
Env.getCurrentEnv().getEventProcessor().processEvent(
|
||||
new DataChangeEvent(db.getCatalog().getId(), db.getId(), tableId));
|
||||
}
|
||||
}
|
||||
|
||||
private boolean finishCheckPartitionVersion(TransactionState transactionState, Database db,
|
||||
List<Pair<OlapTable, Partition>> relatedTblPartitions) {
|
||||
Iterator<TableCommitInfo> tableCommitInfoIterator
|
||||
|
||||
Reference in New Issue
Block a user