[feature-wip](MTMV) Sync finish status only for tasks (#20441)

MTMV tasks keep finish status only to reduce the loss caused by logging.
After changes, unfinished tasks will be lost directly when FE master restarts.
This commit is contained in:
zhangdong
2023-06-08 10:46:25 +08:00
committed by GitHub
parent d2d6ce5d0b
commit 46c68d11aa
8 changed files with 16 additions and 312 deletions

View File

@ -55,7 +55,6 @@ import org.apache.doris.load.loadv2.LoadJobFinalOperation;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.DropMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
@ -762,7 +761,7 @@ public class JournalEntity implements Writable {
break;
}
case OperationType.OP_CHANGE_MTMV_TASK: {
data = ChangeMTMVTask.read(in);
Text.readString(in);
isRead = true;
break;
}

View File

@ -26,10 +26,8 @@ import org.apache.doris.metric.Metric;
import org.apache.doris.metric.MetricLabel;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.MTMVUtils.TaskRetryPolicy;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVCheckpointData;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVJob.JobSchedule;
@ -88,8 +86,6 @@ public class MTMVJobManager {
public void start() {
if (isStarted.compareAndSet(false, true)) {
taskManager.clearUnfinishedTasks();
// check the scheduler before using it
// since it may be shutdown when master change to follower without process shutdown.
if (periodScheduler.isShutdown()) {
@ -219,11 +215,9 @@ public class MTMVJobManager {
periodFutureMap.put(job.getId(), future);
periodNum++;
} else if (job.getTriggerMode() == TriggerMode.ONCE) {
if (job.getRetryPolicy() == TaskRetryPolicy.ALWAYS || job.getRetryPolicy() == TaskRetryPolicy.TIMES) {
MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
submitJobTask(job.getName(), executeOption);
onceNum++;
}
MTMVTaskExecuteParams executeOption = new MTMVTaskExecuteParams();
submitJobTask(job.getName(), executeOption);
onceNum++;
}
}
LOG.info("Register {} period jobs and {} once jobs in the total {} jobs.", periodNum, onceNum, num);
@ -477,10 +471,6 @@ public class MTMVJobManager {
taskManager.replayCreateJobTask(task);
}
public void replayUpdateTask(ChangeMTMVTask changeTask) {
taskManager.replayUpdateTask(changeTask);
}
public void replayDropJobTasks(List<String> taskIds) {
taskManager.dropTasks(taskIds, true);
}
@ -527,7 +517,7 @@ public class MTMVJobManager {
public long write(DataOutputStream dos, long checksum) throws IOException {
MTMVCheckpointData data = new MTMVCheckpointData();
data.jobs = new ArrayList<>(nameToJobMap.values());
data.tasks = taskManager.showTasks(null);
data.tasks = Lists.newArrayList(taskManager.getHistoryTasks());
String s = GsonUtils.GSON.toJson(data);
Text.writeString(dos, s);
return checksum;
@ -553,7 +543,6 @@ public class MTMVJobManager {
return mtmvJobManager;
}
// for test only
public MTMVTaskManager getTaskManager() {
return taskManager;
}

View File

@ -17,9 +17,7 @@
package org.apache.doris.mtmv;
import org.apache.doris.catalog.Env;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.logging.log4j.LogManager;
@ -78,10 +76,6 @@ public class MTMVTaskExecutorPool {
task.setErrorCode(-1);
}
task.setFinishTime(MTMVUtils.getNowTimeStamp());
ChangeMTMVTask changeTask = new ChangeMTMVTask(taskExecutor.getJob().getId(), task, TaskState.RUNNING,
task.getState());
Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
});
taskExecutor.setFuture(future);
}

View File

@ -24,8 +24,6 @@ import org.apache.doris.mtmv.MTMVUtils.JobState;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.mtmv.MTMVUtils.TriggerMode;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
import org.apache.doris.mtmv.metadata.MTMVTask;
import org.apache.doris.qe.ConnectContext;
@ -127,7 +125,6 @@ public class MTMVTaskManager {
MTMVTask task = taskExecutor.initTask(taskId, MTMVUtils.getNowTimeStamp());
task.setPriority(params.getPriority());
LOG.info("Submit a mtmv task with id: {} of the job {}.", taskId, taskExecutor.getJob().getName());
Env.getCurrentEnv().getEditLog().logCreateMTMVTask(task);
arrangeToPendingTask(taskExecutor);
return MTMVUtils.TaskSubmitStatus.SUBMITTED;
}
@ -201,7 +198,7 @@ public class MTMVTaskManager {
if (finalState == TaskState.FAILURE) {
failedTaskCount.incrementAndGet();
}
changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING, finalState);
Env.getCurrentEnv().getEditLog().logCreateMTMVTask(taskExecutor.getTask());
TriggerMode triggerMode = taskExecutor.getJob().getTriggerMode();
if (triggerMode == TriggerMode.ONCE) {
@ -239,19 +236,12 @@ public class MTMVTaskManager {
MTMVTaskExecutor pendingTaskExecutor = taskQueue.poll();
taskExecutorPool.executeTask(pendingTaskExecutor);
runningTaskMap.put(jobId, pendingTaskExecutor);
// change status from PENDING to Running
changeAndLogTaskStatus(jobId, pendingTaskExecutor.getTask(), TaskState.PENDING, TaskState.RUNNING);
currentRunning++;
}
}
}
}
private void changeAndLogTaskStatus(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) {
ChangeMTMVTask changeTask = new ChangeMTMVTask(jobId, task, fromStatus, toStatus);
Env.getCurrentEnv().getEditLog().logChangeMTMVTask(changeTask);
}
public boolean tryLock() {
try {
return reentrantLock.tryLock(5, TimeUnit.SECONDS);
@ -328,86 +318,7 @@ public class MTMVTaskManager {
}
public void replayCreateJobTask(MTMVTask task) {
if (task.getState() == TaskState.SUCCESS || task.getState() == TaskState.FAILURE) {
if (MTMVUtils.getNowTimeStamp() > task.getExpireTime()) {
return;
}
}
switch (task.getState()) {
case PENDING:
String jobName = task.getJobName();
MTMVJob job = mtmvJobManager.getJob(jobName);
if (job == null) {
LOG.warn("fail to obtain task name {} because task is null", jobName);
return;
}
MTMVTaskExecutor taskExecutor = MTMVUtils.buildTask(job);
taskExecutor.setTask(task);
arrangeToPendingTask(taskExecutor);
break;
case RUNNING:
task.setState(TaskState.FAILURE);
addHistory(task);
break;
case FAILURE:
case SUCCESS:
addHistory(task);
break;
default:
break;
}
}
public void replayUpdateTask(ChangeMTMVTask changeTask) {
TaskState fromStatus = changeTask.getFromStatus();
TaskState toStatus = changeTask.getToStatus();
Long jobId = changeTask.getJobId();
if (fromStatus == TaskState.PENDING) {
Queue<MTMVTaskExecutor> taskQueue = getPendingTaskMap().get(jobId);
if (taskQueue == null) {
return;
}
if (taskQueue.size() == 0) {
getPendingTaskMap().remove(jobId);
return;
}
MTMVTaskExecutor pendingTask = taskQueue.poll();
MTMVTask status = pendingTask.getTask();
if (toStatus == TaskState.RUNNING) {
if (status.getTaskId().equals(changeTask.getTaskId())) {
status.setState(TaskState.RUNNING);
getRunningTaskMap().put(jobId, pendingTask);
}
} else if (toStatus == TaskState.FAILURE) {
status.setMessage(changeTask.getErrorMessage());
status.setErrorCode(changeTask.getErrorCode());
status.setState(TaskState.FAILURE);
addHistory(status);
}
if (taskQueue.size() == 0) {
getPendingTaskMap().remove(jobId);
}
} else if (fromStatus == TaskState.RUNNING && (toStatus == TaskState.SUCCESS
|| toStatus == TaskState.FAILURE)) {
MTMVTaskExecutor runningTask = getRunningTaskMap().remove(jobId);
if (runningTask == null) {
return;
}
MTMVTask status = runningTask.getTask();
if (status.getTaskId().equals(changeTask.getTaskId())) {
status.setMessage(changeTask.getErrorMessage());
status.setErrorCode(changeTask.getErrorCode());
status.setState(toStatus);
status.setFinishTime(changeTask.getFinishTime());
addHistory(status);
}
} else {
LOG.warn("Illegal Task taskId:{} status transform from {} to {}", changeTask.getTaskId(), fromStatus,
toStatus);
}
addHistory(task);
}
public void clearTasksByJobName(String jobName, boolean isReplay) {
@ -462,11 +373,13 @@ public class MTMVTaskManager {
Set<String> taskSet = new HashSet<>(taskIds);
// Pending tasks will be clear directly. So we don't drop it again here.
// Check the running task since the task was killed but was not move to the history queue.
for (long key : runningTaskMap.keySet()) {
MTMVTaskExecutor executor = runningTaskMap.get(key);
// runningTaskMap may be removed in the runningIterator
if (executor != null && taskSet.contains(executor.getTask().getTaskId())) {
runningTaskMap.remove(key);
if (!isReplay) {
for (long key : runningTaskMap.keySet()) {
MTMVTaskExecutor executor = runningTaskMap.get(key);
// runningTaskMap may be removed in the runningIterator
if (executor != null && taskSet.contains(executor.getTask().getTaskId())) {
runningTaskMap.remove(key);
}
}
}
// Try to remove history tasks.
@ -479,40 +392,4 @@ public class MTMVTaskManager {
}
LOG.info("drop task history:{}", taskIds);
}
public void clearUnfinishedTasks() {
if (!tryLock()) {
return;
}
try {
Iterator<Long> pendingIter = getPendingTaskMap().keySet().iterator();
while (pendingIter.hasNext()) {
Queue<MTMVTaskExecutor> tasks = getPendingTaskMap().get(pendingIter.next());
while (!tasks.isEmpty()) {
MTMVTaskExecutor taskExecutor = tasks.poll();
taskExecutor.getTask().setMessage("Fe abort the task");
taskExecutor.getTask().setErrorCode(-1);
taskExecutor.getTask().setState(TaskState.FAILURE);
addHistory(taskExecutor.getTask());
changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.PENDING,
TaskState.FAILURE);
}
pendingIter.remove();
}
Iterator<Long> runningIter = getRunningTaskMap().keySet().iterator();
while (runningIter.hasNext()) {
MTMVTaskExecutor taskExecutor = getRunningTaskMap().get(runningIter.next());
taskExecutor.getTask().setMessage("Fe abort the task");
taskExecutor.getTask().setErrorCode(-1);
taskExecutor.getTask().setState(TaskState.FAILURE);
taskExecutor.getTask().setFinishTime(MTMVUtils.getNowTimeStamp());
runningIter.remove();
addHistory(taskExecutor.getTask());
changeAndLogTaskStatus(taskExecutor.getJobId(), taskExecutor.getTask(), TaskState.RUNNING,
TaskState.FAILURE);
}
} finally {
unlock();
}
}
}

View File

@ -1,131 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.mtmv.metadata;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mtmv.MTMVUtils.TaskState;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class ChangeMTMVTask implements Writable {
@SerializedName("jobId")
private long jobId;
@SerializedName("taskId")
private String taskId;
@SerializedName("finishTime")
private long finishTime;
@SerializedName("fromStatus")
TaskState fromStatus;
@SerializedName("toStatus")
TaskState toStatus;
@SerializedName("errorCode")
private int errorCode = -1;
@SerializedName("errorMessage")
private String errorMessage = "";
public ChangeMTMVTask(long jobId, MTMVTask task, TaskState fromStatus, TaskState toStatus) {
this.jobId = jobId;
this.taskId = task.getTaskId();
this.fromStatus = fromStatus;
this.toStatus = toStatus;
this.finishTime = task.getFinishTime();
errorCode = task.getErrorCode();
errorMessage = task.getMessage();
}
public long getJobId() {
return jobId;
}
public void setJobId(long jobId) {
this.jobId = jobId;
}
public String getTaskId() {
return taskId;
}
public void setTaskId(String taskId) {
this.taskId = taskId;
}
public TaskState getFromStatus() {
return fromStatus;
}
public void setFromStatus(TaskState fromStatus) {
this.fromStatus = fromStatus;
}
public TaskState getToStatus() {
return toStatus;
}
public void setToStatus(TaskState toStatus) {
this.toStatus = toStatus;
}
public int getErrorCode() {
return errorCode;
}
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
public String getErrorMessage() {
return errorMessage;
}
public void setErrorMessage(String errorMessage) {
this.errorMessage = errorMessage;
}
public long getFinishTime() {
return finishTime;
}
public void setFinishTime(long finishTime) {
this.finishTime = finishTime;
}
public static ChangeMTMVTask read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, ChangeMTMVTask.class);
}
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
}

View File

@ -17,32 +17,14 @@
package org.apache.doris.mtmv.metadata;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
public class MTMVCheckpointData implements Writable {
public class MTMVCheckpointData {
@SerializedName("jobs")
public List<MTMVJob> jobs;
@SerializedName("tasks")
public List<MTMVTask> tasks;
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static MTMVCheckpointData read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, MTMVCheckpointData.class);
}
}

View File

@ -69,7 +69,6 @@ import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.metadata.ChangeMTMVJob;
import org.apache.doris.mtmv.metadata.ChangeMTMVTask;
import org.apache.doris.mtmv.metadata.DropMTMVJob;
import org.apache.doris.mtmv.metadata.DropMTMVTask;
import org.apache.doris.mtmv.metadata.MTMVJob;
@ -917,8 +916,6 @@ public class EditLog {
break;
}
case OperationType.OP_CHANGE_MTMV_TASK: {
final ChangeMTMVTask changeTask = (ChangeMTMVTask) journal.getData();
env.getMTMVJobManager().replayUpdateTask(changeTask);
break;
}
case OperationType.OP_DROP_MTMV_TASK: {
@ -1694,10 +1691,6 @@ public class EditLog {
logEdit(OperationType.OP_CREATE_MTMV_TASK, task);
}
public void logChangeMTMVTask(ChangeMTMVTask changeTaskRecord) {
logEdit(OperationType.OP_CHANGE_MTMV_TASK, changeTaskRecord);
}
public void logDropMTMVTasks(List<String> taskIds) {
logEdit(OperationType.OP_DROP_MTMV_TASK, new DropMTMVTask(taskIds));
}

View File

@ -266,6 +266,7 @@ public class OperationType {
public static final short OP_CREATE_MTMV_TASK = 340;
public static final short OP_DROP_MTMV_TASK = 341;
@Deprecated
public static final short OP_CHANGE_MTMV_TASK = 342;
public static final short OP_ALTER_MTMV_STMT = 345;