From e92d2fcb5a7477be0ddd71ac0aa7143322a08aa3 Mon Sep 17 00:00:00 2001 From: meiyi Date: Wed, 8 Nov 2023 22:10:06 +0800 Subject: [PATCH] [improvement](group commit) Group commit insert into can be executed on observer fe (#26589) --- .../apache/doris/analysis/NativeInsertStmt.java | 16 +++++++++++++--- .../java/org/apache/doris/qe/StmtExecutor.java | 4 ++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index 1837c26891..219c9b9373 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -365,7 +365,7 @@ public class NativeInsertStmt extends InsertStmt { analyzeTargetTable(analyzer); db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb()); - analyzeGroupCommit(); + analyzeGroupCommit(analyzer); if (isGroupCommit()) { return; } @@ -1067,14 +1067,24 @@ public class NativeInsertStmt extends InsertStmt { @Override public RedirectStatus getRedirectStatus() { - if (isExplain()) { + if (isExplain() || isGroupCommit()) { return RedirectStatus.NO_FORWARD; } else { return RedirectStatus.FORWARD_WITH_SYNC; } } - private void analyzeGroupCommit() { + public void analyzeGroupCommit(Analyzer analyzer) { + if (isGroupCommit) { + return; + } + try { + tblName.analyze(analyzer); + initTargetTable(analyzer); + } catch (Throwable e) { + LOG.warn("analyze group commit failed", e); + return; + } if (ConnectContext.get().getSessionVariable().enableInsertGroupCommit && targetTable instanceof OlapTable && !ConnectContext.get().isTxnModel() diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 8eaff7fdf1..49a3e45608 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1109,6 +1109,10 @@ public class StmtExecutor { analyzeVariablesInStmt(); } + if (context.getSessionVariable().enableInsertGroupCommit && parsedStmt instanceof NativeInsertStmt) { + NativeInsertStmt nativeInsertStmt = (NativeInsertStmt) parsedStmt; + nativeInsertStmt.analyzeGroupCommit(new Analyzer(context.getEnv(), context)); + } redirectStatus = parsedStmt.getRedirectStatus(); }