[fix](mtmv)fix refresh failed when not use db before create MTMV (#34431) (#34522)

when create MTMV,we will save current ctl and db.

when refresh MTMV,will create an ConnectContext, and set same ctl, db to ctx

when db,ctx dropped, task will be failed.

But sometimes deleting a db does not actually have an impact, so changing it to not directly fail. If refreshing the data does cause an error, then giving the user an error message
This commit is contained in:
zhangdong
2024-05-08 16:03:22 +08:00
committed by GitHub
parent e085f75a43
commit 202cdb2744
5 changed files with 144 additions and 24 deletions

View File

@ -25,6 +25,7 @@ import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
@ -154,8 +155,8 @@ public class MTMVTask extends AbstractTask {
@Override
public void run() throws JobException {
LOG.info("mtmv task run, taskId: {}", super.getTaskId());
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
try {
ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
if (LOG.isDebugEnabled()) {
String taskSessionContext = ctx.getSessionVariable().toJson().toJSONString();
if (LOG.isDebugEnabled()) {
@ -201,8 +202,15 @@ public class MTMVTask extends AbstractTask {
}
} catch (Throwable e) {
if (getStatus() == TaskStatus.RUNNING) {
LOG.warn("run task failed: ", e);
throw new JobException(e);
StringBuilder errMsg = new StringBuilder();
// when env ctl/db not exist, need give client tips
Pair<Boolean, String> pair = MTMVPlanUtil.checkEnvInfo(mtmv.getEnvInfo(), ctx);
if (!pair.first) {
errMsg.append(pair.second);
}
errMsg.append(e.getMessage());
LOG.warn("run task failed: ", errMsg.toString());
throw new JobException(errMsg.toString(), e);
} else {
// if status is not `RUNNING`,maybe the task was canceled, therefore, it is a normal situation
LOG.info("task [{}] interruption running, because status is [{}]", getTaskId(), getStatus());

View File

@ -19,11 +19,12 @@ package org.apache.doris.mtmv;
import org.apache.doris.analysis.StatementBase;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Pair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.nereids.NereidsPlanner;
@ -48,26 +49,54 @@ import java.util.Set;
public class MTMVPlanUtil {
public static ConnectContext createMTMVContext(MTMV mtmv) throws AnalysisException {
public static ConnectContext createMTMVContext(MTMV mtmv) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setCurrentUserIdentity(UserIdentity.ADMIN);
ctx.getState().reset();
ctx.setThreadLocalInfo();
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrAnalysisException(mtmv.getEnvInfo().getCtlId());
ctx.changeDefaultCatalog(catalog.getName());
ctx.setDatabase(catalog.getDbOrAnalysisException(mtmv.getEnvInfo().getDbId()).getFullName());
ctx.getSessionVariable().enableFallbackToOriginalPlanner = false;
ctx.getSessionVariable().enableNereidsDML = true;
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
}
ctx.getSessionVariable().enableNereidsDML = true;
// switch catalog;
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(mtmv.getEnvInfo().getCtlId());
// if catalog not exist, it may not have any impact, so there is no error and it will be returned directly
if (catalog == null) {
return ctx;
}
ctx.changeDefaultCatalog(catalog.getName());
// use db
Optional<? extends DatabaseIf<? extends TableIf>> databaseIf = catalog.getDb(mtmv.getEnvInfo().getDbId());
// if db not exist, it may not have any impact, so there is no error and it will be returned directly
if (!databaseIf.isPresent()) {
return ctx;
}
ctx.setDatabase(databaseIf.get().getFullName());
return ctx;
}
public static Pair<Boolean, String> checkEnvInfo(EnvInfo envInfo, ConnectContext ctx) {
if (envInfo.getCtlId() != ctx.getCurrentCatalog().getId()) {
return Pair.of(false, String.format(
"The catalog selected when creating the materialized view was %s, "
+ "but now this catalog has been deleted. "
+ "Please recreate the materialized view.",
envInfo.getCtlId()));
}
if (envInfo.getDbId() != ctx.getCurrentDbId()) {
return Pair.of(false, String.format(
"The database selected when creating the materialized view was %s, "
+ "but now this database has been deleted. "
+ "Please recreate the materialized view.",
envInfo.getDbId()));
}
return Pair.of(true, "");
}
public static MTMVRelation generateMTMVRelation(MTMV mtmv, ConnectContext ctx) {
// Should not make table without data to empty relation when analyze the related table,
// so add disable rules

View File

@ -20,7 +20,6 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.mtmv.MTMVCache;
import org.apache.doris.mtmv.MTMVPlanUtil;
import org.apache.doris.nereids.trees.plans.Plan;
@ -73,17 +72,8 @@ public class TableCollector extends DefaultPlanVisitor<Plan, TableCollectorConte
if (!context.isExpand()) {
return;
}
try {
MTMVCache expandedMv = MTMVCache.from(mtmv, MTMVPlanUtil.createMTMVContext(mtmv));
expandedMv.getLogicalPlan().accept(this, context);
} catch (AnalysisException e) {
LOG.error(String.format(
"table collector expand fail, mtmv name is %s, targetTableTypes is %s",
mtmv.getName(), context.targetTableTypes), e);
throw new org.apache.doris.nereids.exceptions.AnalysisException(
String.format("expand mv and collect table fail, mv name is %s, mv sql is %s",
mtmv.getName(), mtmv.getQuerySql()), e);
}
MTMVCache expandedMv = MTMVCache.from(mtmv, MTMVPlanUtil.createMTMVContext(mtmv));
expandedMv.getLogicalPlan().accept(this, context);
}
/**