diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp index c62996da19..0fe59cdf9c 100644 --- a/be/src/olap/wal_manager.cpp +++ b/be/src/olap/wal_manager.cpp @@ -377,12 +377,10 @@ Status WalManager::replay() { std::lock_guard wrlock(_lock); auto it = _table_map.begin(); while (it != _table_map.end()) { - if (it->second->size() == 0) { - it = _table_map.erase(it); - } else { + if (it->second->size() > 0) { replay_tables.push_back(it->first); - it++; } + it++; } } for (const auto& table_id : replay_tables) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 1aff92b4d2..0fcd5050fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -1109,7 +1109,12 @@ public class NativeInsertStmt extends InsertStmt { } } - public void analyzeGroupCommit(Analyzer analyzer) { + public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException { + if (isGroupCommitStreamLoadSql && (targetTable instanceof OlapTable) + && !((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange()) { + throw new AnalysisException( + "table light_schema_change is false, can't do http_stream with group commit mode"); + } if (isGroupCommit) { return; } @@ -1124,6 +1129,7 @@ public class NativeInsertStmt extends InsertStmt { if (!partialUpdate && ConnectContext.get().getSessionVariable().isEnableInsertGroupCommit() && ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES && targetTable instanceof OlapTable + && ((OlapTable) targetTable).getTableProperty().getUseSchemaLightChange() && !ConnectContext.get().isTxnModel() && getQueryStmt() instanceof SelectStmt && ((SelectStmt) getQueryStmt()).getTableRefs().isEmpty() && targetPartitionNames == null diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 11a7926e5d..0427fea005 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -249,7 +249,7 @@ public class InsertIntoTableCommand extends Command implements ForwardWithSync, return ConnectContext.get().getSessionVariable().getSqlMode() != SqlModeHelper.MODE_NO_BACKSLASH_ESCAPES && physicalOlapTableSink.getTargetTable() instanceof OlapTable && !ConnectContext.get().isTxnModel() && sink.getFragment().getPlanRoot() instanceof UnionNode && physicalOlapTableSink.getPartitionIds() - .isEmpty(); + .isEmpty() && physicalOlapTableSink.getTargetTable().getTableProperty().getUseSchemaLightChange(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 75405854ca..e5b0f65101 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -2001,6 +2001,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { } long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); + if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null + && !request.getGroupCommitMode().equals("off_mode"))) { + throw new UserException("table light_schema_change is false, can't do stream load with group commit mode"); + } result.setDbId(db.getId()); result.setTableId(table.getId()); result.setBaseSchemaVersion(((OlapTable) table).getBaseSchemaVersion()); @@ -2062,6 +2066,10 @@ public class FrontendServiceImpl implements FrontendService.Iface { } long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() : 5000; Table table = db.getTableOrMetaException(request.getTbl(), TableType.OLAP); + if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null + && !request.getGroupCommitMode().equals("off_mode"))) { + throw new UserException("table light_schema_change is false, can't do stream load with group commit mode"); + } return this.generatePipelineStreamLoadPut(request, db, fullDbName, (OlapTable) table, timeoutMs, 1, false); }