[fix](mtmv)add log for resolve pending task (#28999)

* add lock for resolve pending task

* add lock for resolve pending task
This commit is contained in:
zhangdong
2023-12-26 00:29:28 +08:00
committed by GitHub
parent cefae3dc90
commit 2aea47c0a9

View File

@ -135,6 +135,7 @@ public class MTMVTask extends AbstractTask {
@Override
public void run() throws JobException {
LOG.info("mtmv task run, taskId: {}", super.getTaskId());
try {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
// Every time a task is run, the relation is regenerated because baseTables and baseViews may change,
@ -185,18 +186,21 @@ public class MTMVTask extends AbstractTask {
@Override
public synchronized void onFail() throws JobException {
LOG.info("mtmv task onFail, taskId: {}", super.getTaskId());
super.onFail();
after();
}
@Override
public synchronized void onSuccess() throws JobException {
LOG.info("mtmv task onSuccess, taskId: {}", super.getTaskId());
super.onSuccess();
after();
}
@Override
public synchronized void cancel() throws JobException {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
super.cancel();
if (executor != null) {
executor.cancel();
@ -206,6 +210,7 @@ public class MTMVTask extends AbstractTask {
@Override
public void before() throws JobException {
LOG.info("mtmv task before, taskId: {}", super.getTaskId());
super.before();
try {
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
@ -222,12 +227,16 @@ public class MTMVTask extends AbstractTask {
@Override
public void runTask() throws JobException {
LOG.info("mtmv task runTask, taskId: {}", super.getTaskId());
MTMVJob job = (MTMVJob) getJobOrJobException();
try {
LOG.info("mtmv task get writeLock start, taskId: {}", super.getTaskId());
job.writeLock();
LOG.info("mtmv task get writeLock end, taskId: {}", super.getTaskId());
super.runTask();
} finally {
job.writeUnlock();
LOG.info("mtmv task release writeLock, taskId: {}", super.getTaskId());
}
}