[fix](group commit) Pick some group commit pr (#38320)
Pick https://github.com/apache/doris/pull/38292, https://github.com/apache/doris/pull/34021, https://github.com/apache/doris/pull/38228, some modify of https://github.com/apache/doris/pull/37260, some modify of https://github.com/apache/doris/pull/37595
This commit is contained in:
@ -183,6 +183,9 @@ public class MaterializedViewHandler extends AlterHandler {
|
||||
*/
|
||||
public void processCreateMaterializedView(CreateMaterializedViewStmt addMVClause, Database db, OlapTable olapTable)
|
||||
throws DdlException, AnalysisException {
|
||||
// wait wal delete
|
||||
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
|
||||
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
|
||||
olapTable.writeLockOrDdlException();
|
||||
try {
|
||||
olapTable.checkNormalStateForAlter();
|
||||
@ -217,11 +220,6 @@ public class MaterializedViewHandler extends AlterHandler {
|
||||
addAlterJobV2(rollupJobV2);
|
||||
|
||||
olapTable.setState(OlapTableState.ROLLUP);
|
||||
|
||||
// wait wal delete
|
||||
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
|
||||
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
|
||||
|
||||
Env.getCurrentEnv().getEditLog().logAlterJob(rollupJobV2);
|
||||
LOG.info("finished to create materialized view job: {}", rollupJobV2.getJobId());
|
||||
} finally {
|
||||
@ -244,6 +242,9 @@ public class MaterializedViewHandler extends AlterHandler {
|
||||
public void processBatchAddRollup(String rawSql, List<AlterClause> alterClauses, Database db, OlapTable olapTable)
|
||||
throws DdlException, AnalysisException {
|
||||
checkReplicaCount(olapTable);
|
||||
// wait wal delete
|
||||
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
|
||||
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
|
||||
Map<String, RollupJobV2> rollupNameJobMap = new LinkedHashMap<>();
|
||||
// save job id for log
|
||||
Set<Long> logJobIdSet = new HashSet<>();
|
||||
@ -305,10 +306,6 @@ public class MaterializedViewHandler extends AlterHandler {
|
||||
// but this order is more reasonable
|
||||
olapTable.setState(OlapTableState.ROLLUP);
|
||||
|
||||
// wait wal delete
|
||||
Env.getCurrentEnv().getGroupCommitManager().blockTable(olapTable.getId());
|
||||
Env.getCurrentEnv().getGroupCommitManager().waitWalFinished(olapTable.getId());
|
||||
|
||||
// 2 batch submit rollup job
|
||||
List<AlterJobV2> rollupJobV2List = new ArrayList<>(rollupNameJobMap.values());
|
||||
batchAddAlterJobV2(rollupJobV2List);
|
||||
|
||||
@ -2207,11 +2207,17 @@ public class StmtExecutor {
|
||||
errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId
|
||||
+ ", query_id: " + DebugUtil.printId(context.queryId()) + ", backend_id: "
|
||||
+ groupCommitPlanner.getBackend().getId() + ", status: " + response.getStatus();
|
||||
if (response.hasErrorUrl()) {
|
||||
errMsg += ", error url: " + response.getErrorUrl();
|
||||
}
|
||||
}
|
||||
} else if (code != TStatusCode.OK) {
|
||||
errMsg = "group commit insert failed. db_id: " + dbId + ", table_id: " + tableId + ", query_id: "
|
||||
+ DebugUtil.printId(context.queryId()) + ", backend_id: " + groupCommitPlanner.getBackend()
|
||||
.getId() + ", status: " + response.getStatus();
|
||||
if (response.hasErrorUrl()) {
|
||||
errMsg += ", error url: " + response.getErrorUrl();
|
||||
}
|
||||
ErrorReport.reportDdlException(errMsg, ErrorCode.ERR_FAILED_WHEN_INSERT);
|
||||
}
|
||||
label = response.getLabel();
|
||||
|
||||
Reference in New Issue
Block a user