[Fix](MTMV)Support master and follow change in multi fe for mtmv (#16149)

Support master and follow change in multi fe for mtmv

This PR fixes following issues:

1. Start the mtmv only in master node, if master change to follower, it will stop the scheduler.
2. Fix a double meta write here
3. Rename some edit log function and variables
4. If a mv both have PeriodicalJob and immediate job and PeriodicalJob will be trigger right now, scheduler will ignore the immediate job.
5. Fix expired time bugs, and make sure it will be clean among all the fes.
6. cleanerScheduler interval from 1 day to 1 minute.
This commit is contained in:
huangzhaowei
2023-02-01 20:02:46 +08:00
committed by GitHub
parent f14c62b274
commit 0842aa2947
13 changed files with 147 additions and 64 deletions

View File

@ -854,8 +854,6 @@ public class Env {
// If not using bdb, we need to notify the FE type transfer manually.
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
}
// 7. start mtmv jobManager
mtmvJobManager.start();
}
// wait until FE is ready.
@ -1412,7 +1410,8 @@ public class Env {
if (Config.enable_hms_events_incremental_sync) {
metastoreEventsProcessor.start();
}
// start mtmv jobManager
mtmvJobManager.start();
}
// start threads that should running on all FE
@ -1462,6 +1461,9 @@ public class Env {
startNonMasterDaemonThreads();
MetricRepo.init();
// stop mtmv scheduler
mtmvJobManager.stop();
}
// Set global variable 'lower_case_table_names' only when the cluster is initialized.

View File

@ -956,7 +956,7 @@ public class InternalCatalog implements CatalogIf<Database> {
if (table instanceof MaterializedView && Config.enable_mtmv_scheduler_framework) {
List<Long> dropIds = Env.getCurrentEnv().getMTMVJobManager().showJobs(db.getFullName(), table.getName())
.stream().map(MTMVJob::getId).collect(Collectors.toList());
Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, false);
Env.getCurrentEnv().getMTMVJobManager().dropJobs(dropIds, isReplay);
LOG.info("Drop related {} mv job.", dropIds.size());
}
LOG.info("finished dropping table[{}] in db[{}]", table.getName(), db.getFullName());

View File

@ -744,7 +744,7 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ALTER_MTMV_JOB: {
case OperationType.OP_CHANGE_MTMV_JOB: {
data = ChangeMTMVJob.read(in);
isRead = true;
break;
@ -759,7 +759,7 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
case OperationType.OP_ALTER_MTMV_TASK: {
case OperationType.OP_CHANGE_MTMV_TASK: {
data = ChangeMTMVTask.read(in);
isRead = true;
break;

View File

@ -57,14 +57,18 @@ public class MTMVJobFactory {
public static List<MTMVJob> buildJob(MaterializedView materializedView, String dbName) {
List<MTMVJob> jobs = new ArrayList<>();
if (materializedView.getBuildMode() == BuildMode.IMMEDIATE) {
jobs.add(genOnceJob(materializedView, dbName));
}
MVRefreshTriggerInfo triggerInfo = materializedView.getRefreshInfo().getTriggerInfo();
boolean isRunPeriodJobImmediate = false;
if (triggerInfo != null && triggerInfo.getRefreshTrigger() == RefreshTrigger.INTERVAL) {
jobs.add(genPeriodicalJob(materializedView, dbName));
MTMVJob job = genPeriodicalJob(materializedView, dbName);
isRunPeriodJobImmediate = MTMVUtils.getDelaySeconds(job) == 0;
jobs.add(job);
}
// if the PeriodicalJob run immediate since an early start time, don't run the immediate build.
if (!isRunPeriodJobImmediate && materializedView.getBuildMode() == BuildMode.IMMEDIATE) {
jobs.add(genOnceJob(materializedView, dbName));
}
return jobs;
}

View File

@ -62,9 +62,9 @@ public class MTMVJobManager {
private final MTMVTaskManager taskManager;
private final ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService periodScheduler = Executors.newScheduledThreadPool(1);
private final ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService cleanerScheduler = Executors.newScheduledThreadPool(1);
private final ReentrantLock reentrantLock;
@ -82,8 +82,17 @@ public class MTMVJobManager {
if (isStarted.compareAndSet(false, true)) {
taskManager.clearUnfinishedTasks();
// check the scheduler before using it
// since it may be shutdown when master change to follower without process shutdown.
if (periodScheduler.isShutdown()) {
periodScheduler = Executors.newScheduledThreadPool(1);
}
registerJobs();
if (cleanerScheduler.isShutdown()) {
cleanerScheduler = Executors.newScheduledThreadPool(1);
}
cleanerScheduler.scheduleAtFixedRate(() -> {
if (!Env.getCurrentEnv().isMaster()) {
return;
@ -99,29 +108,43 @@ public class MTMVJobManager {
} finally {
unlock();
}
}, 0, 1, TimeUnit.DAYS);
}, 0, 1, TimeUnit.MINUTES);
taskManager.startTaskScheduler();
}
}
public void stop() {
if (isStarted.compareAndSet(true, false)) {
periodScheduler.shutdown();
cleanerScheduler.shutdown();
taskManager.stopTaskScheduler();
}
}
private void registerJobs() {
int num = nameToJobMap.size();
int periodNum = 0;
int onceNum = 0;
for (MTMVJob job : nameToJobMap.values()) {
if (job.getState() != JobState.ACTIVE) {
if (!job.getState().equals(JobState.ACTIVE)) {
continue;
}
if (job.getTriggerMode() == TriggerMode.PERIODICAL) {
JobSchedule schedule = job.getSchedule();
ScheduledFuture<?> future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
MTMVUtils.getDelaySeconds(job), schedule.getPeriod(), schedule.getTimeUnit());
MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS);
periodFutureMap.put(job.getId(), future);
periodNum++;
} else if (job.getTriggerMode() == TriggerMode.ONCE) {
if (job.getRetryPolicy() == TaskRetryPolicy.ALWAYS || job.getRetryPolicy() == TaskRetryPolicy.TIMES) {
MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
submitJobTask(job.getName(), executeOption);
onceNum++;
}
}
}
LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num);
}
public void createJob(MTMVJob job, boolean isReplay) throws DdlException {
@ -146,27 +169,33 @@ public class MTMVJobManager {
idToJobMap.put(job.getId(), job);
if (!isReplay) {
// log job before submit any task.
Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job);
Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
ScheduledFuture<?> future = periodScheduler.scheduleAtFixedRate(() -> submitJobTask(job.getName()),
MTMVUtils.getDelaySeconds(job), schedule.getPeriod(), schedule.getTimeUnit());
MTMVUtils.getDelaySeconds(job), schedule.getSecondPeriod(), TimeUnit.SECONDS);
periodFutureMap.put(job.getId(), future);
}
} else if (job.getTriggerMode() == TriggerMode.ONCE) {
job.setState(JobState.ACTIVE);
job.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_job_expired);
// only change once job state from unknown to active. if job is completed, only put it in map
if (job.getState() == JobState.UNKNOWN) {
job.setState(JobState.ACTIVE);
job.setExpireTime(MTMVUtils.getNowTimeStamp() + Config.scheduler_mtmv_job_expired);
}
nameToJobMap.put(job.getName(), job);
idToJobMap.put(job.getId(), job);
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job);
Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
submitJobTask(job.getName(), executeOption);
}
} else if (job.getTriggerMode() == TriggerMode.MANUAL) {
job.setState(JobState.ACTIVE);
// only change once job state from unknown to active. if job is completed, only put it in map
if (job.getState() == JobState.UNKNOWN) {
job.setState(JobState.ACTIVE);
}
nameToJobMap.put(job.getName(), job);
idToJobMap.put(job.getId(), job);
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logCreateScheduleJob(job);
Env.getCurrentEnv().getEditLog().logCreateMTMVJob(job);
}
} else {
throw new DdlException("Unsupported trigger mode for multi-table mv.");
@ -235,7 +264,7 @@ public class MTMVJobManager {
job.setState(changeJob.getToStatus());
job.setLastModifyTime(changeJob.getLastModifyTime());
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logChangeScheduleJob(changeJob);
Env.getCurrentEnv().getEditLog().logChangeMTMVJob(changeJob);
}
} finally {
unlock();
@ -268,7 +297,7 @@ public class MTMVJobManager {
}
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logDropScheduleJob(jobIds);
Env.getCurrentEnv().getEditLog().logDropMTMVJob(jobIds);
}
} finally {
unlock();
@ -343,11 +372,7 @@ public class MTMVJobManager {
}
public void replayDropJobTasks(List<String> taskIds) {
Map<String, String> index = Maps.newHashMapWithExpectedSize(taskIds.size());
for (String taskId : taskIds) {
index.put(taskId, null);
}
taskManager.getAllHistory().removeIf(runStatus -> index.containsKey(runStatus.getTaskId()));
taskManager.dropTasks(taskIds, true);
}
public void removeExpiredJobs() {
@ -382,7 +407,7 @@ public class MTMVJobManager {
unlock();
}
dropJobs(jobIdsToDelete, true);
dropJobs(jobIdsToDelete, false);
}
public MTMVJob getJob(String jobName) {

View File

@ -125,6 +125,10 @@ public class MTMVTaskExecutor implements Comparable<MTMVTaskExecutor> {
return task;
}
public void setTask(MTMVTask task) {
this.task = task;
}
public MTMVTask initTask(String taskId, Long createTime) {
MTMVTask task = new MTMVTask();
task.setTaskId(taskId);

View File

@ -81,7 +81,7 @@ public class MTMVTaskExecutorPool {
ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
task.getState());
Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);
Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
});
taskExecutor.setFuture(future);
}

View File

@ -37,10 +37,12 @@ import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.Nullable;
import java.util.Deque;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@ -67,7 +69,7 @@ public class MTMVTaskManager {
// keep track of all the completed tasks
private final Deque<MTMVTask> historyQueue = Queues.newLinkedBlockingDeque();
private final ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1);
private ScheduledExecutorService taskScheduler = Executors.newScheduledThreadPool(1);
private final MTMVJobManager mtmvJobManager;
@ -76,6 +78,9 @@ public class MTMVTaskManager {
}
public void startTaskScheduler() {
if (taskScheduler.isShutdown()) {
taskScheduler = Executors.newScheduledThreadPool(1);
}
taskScheduler.scheduleAtFixedRate(() -> {
if (!tryLock()) {
return;
@ -91,6 +96,10 @@ public class MTMVTaskManager {
}, 0, 1, TimeUnit.SECONDS);
}
public void stopTaskScheduler() {
taskScheduler.shutdown();
}
public MTMVUtils.TaskSubmitStatus submitTask(MTMVTaskExecutor taskExecutor, MTMVTaskExecuteParams params) {
// duplicate submit
if (taskExecutor.getTask() != null) {
@ -113,7 +122,8 @@ public class MTMVTaskManager {
String taskId = UUID.randomUUID().toString();
MTMVTask task = taskExecutor.initTask(taskId, MTMVUtils.getNowTimeStamp());
task.setPriority(params.getPriority());
Env.getCurrentEnv().getEditLog().logCreateScheduleTask(task);
LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, taskExecutor.getJob().getName());
Env.getCurrentEnv().getEditLog().logCreateMTMVTask(task);
arrangeToPendingTask(taskExecutor);
return MTMVUtils.TaskSubmitStatus.SUBMITTED;
}
@ -228,7 +238,7 @@ public class MTMVTaskManager {
private void changeAndLogTaskStatus(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) {
ChangeMTMVTask changeTask = new ChangeMTMVTask(jobId, task, fromStatus, toStatus);
Env.getCurrentEnv().getEditLog().logAlterScheduleTask(changeTask);
Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
}
public boolean tryLock() {
@ -322,7 +332,7 @@ public class MTMVTaskManager {
return;
}
MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job);
taskExecutor.initTask(task.getTaskId(), task.getCreateTime());
taskExecutor.setTask(task);
arrangeToPendingTask(taskExecutor);
break;
case RUNNING:
@ -399,19 +409,32 @@ public class MTMVTaskManager {
}
try {
Deque<MTMVTask> taskHistory = getAllHistory();
Iterator<MTMVTask> iterator = taskHistory.iterator();
while (iterator.hasNext()) {
MTMVTask task = iterator.next();
for (MTMVTask task : taskHistory) {
long expireTime = task.getExpireTime();
if (currentTime > expireTime) {
historyToDelete.add(task.getTaskId());
iterator.remove();
}
}
} finally {
unlock();
}
LOG.info("remove task history:{}", historyToDelete);
dropTasks(historyToDelete, false);
}
public void dropTasks(List<String> taskIds, boolean isReplay) {
if (!tryLock()) {
return;
}
try {
Set<String> taskSet = new HashSet<>(taskIds);
getAllHistory().removeIf(mtmvTask -> taskSet.contains(mtmvTask.getTaskId()));
if (!isReplay) {
Env.getCurrentEnv().getEditLog().logDropMTMVTasks(taskIds);
}
} finally {
unlock();
}
LOG.info("drop task history:{}", taskIds);
}
public void clearUnfinishedTasks() {

View File

@ -85,7 +85,6 @@ public class MTMVUtils {
return getDelaySeconds(job, LocalDateTime.now());
}
// this method only for test
public static long getDelaySeconds(MTMVJob job, LocalDateTime now) {
long lastModifyTime = job.getLastModifyTime();
long nextTime = 0;
@ -117,10 +116,10 @@ public class MTMVUtils {
switch (strTimeUnit.toUpperCase()) {
case "SECOND":
return TimeUnit.SECONDS;
case "MINUTE":
return TimeUnit.MINUTES;
case "HOUR":
return TimeUnit.HOURS;
case "DAY":
return TimeUnit.DAYS;
default:
return TimeUnit.DAYS;
}

View File

@ -259,8 +259,8 @@ public class MTMVJob implements Writable, Comparable {
}
public String toString() {
return " (START " + LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), ZoneId.systemDefault())
+ " EVERY(" + period + " " + timeUnit + "))";
return "START " + LocalDateTime.ofInstant(Instant.ofEpochSecond(startTime), ZoneId.systemDefault())
+ " EVERY(" + period + " " + timeUnit + ")";
}
}

View File

@ -881,7 +881,7 @@ public class EditLog {
env.getMTMVJobManager().replayCreateJob(job);
break;
}
case OperationType.OP_ALTER_MTMV_JOB: {
case OperationType.OP_CHANGE_MTMV_JOB: {
final ChangeMTMVJob changeJob = (ChangeMTMVJob) journal.getData();
env.getMTMVJobManager().replayUpdateJob(changeJob);
break;
@ -896,7 +896,7 @@ public class EditLog {
env.getMTMVJobManager().replayCreateJobTask(task);
break;
}
case OperationType.OP_ALTER_MTMV_TASK: {
case OperationType.OP_CHANGE_MTMV_TASK: {
final ChangeMTMVTask changeTask = (ChangeMTMVTask) journal.getData();
env.getMTMVJobManager().replayUpdateTask(changeTask);
break;
@ -1613,27 +1613,27 @@ public class EditLog {
logEdit(id, log);
}
public void logCreateScheduleJob(MTMVJob job) {
public void logCreateMTMVJob(MTMVJob job) {
logEdit(OperationType.OP_CREATE_MTMV_JOB, job);
}
public void logDropScheduleJob(List<Long> jobIds) {
public void logDropMTMVJob(List<Long> jobIds) {
logEdit(OperationType.OP_DROP_MTMV_JOB, new DropMTMVJob(jobIds));
}
public void logChangeScheduleJob(ChangeMTMVJob changeJob) {
logEdit(OperationType.OP_ALTER_MTMV_JOB, changeJob);
public void logChangeMTMVJob(ChangeMTMVJob changeJob) {
logEdit(OperationType.OP_CHANGE_MTMV_JOB, changeJob);
}
public void logCreateScheduleTask(MTMVTask task) {
public void logCreateMTMVTask(MTMVTask task) {
logEdit(OperationType.OP_CREATE_MTMV_TASK, task);
}
public void logAlterScheduleTask(ChangeMTMVTask changeTaskRecord) {
logEdit(OperationType.OP_ALTER_MTMV_TASK, changeTaskRecord);
public void logChangeMTMVTask(ChangeMTMVTask changeTaskRecord) {
logEdit(OperationType.OP_CHANGE_MTMV_TASK, changeTaskRecord);
}
public void logAlterScheduleTask(List<String> taskIds) {
public void logDropMTMVTasks(List<String> taskIds) {
logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
}

View File

@ -251,11 +251,11 @@ public class OperationType {
// scheduler job and task 330-350
public static final short OP_CREATE_MTMV_JOB = 330;
public static final short OP_DROP_MTMV_JOB = 331;
public static final short OP_ALTER_MTMV_JOB = 332;
public static final short OP_CHANGE_MTMV_JOB = 332;
public static final short OP_CREATE_MTMV_TASK = 340;
public static final short OP_DROP_MTMV_TASK = 341;
public static final short OP_ALTER_MTMV_TASK = 342;
public static final short OP_CHANGE_MTMV_TASK = 342;
public static final short OP_DROP_EXTERNAL_TABLE = 350;
public static final short OP_DROP_EXTERNAL_DB = 351;