From 6f0c8fb69840fb0f73d0f1dec2cf3ad440482f0d Mon Sep 17 00:00:00 2001 From: Stalary <452024236@qq.com> Date: Sat, 7 May 2022 08:51:54 +0800 Subject: [PATCH] [Feature] CTAS support insert data (#9271) --- .../analysis/CreateTableAsSelectStmt.java | 44 ++++++++++--------- .../org/apache/doris/catalog/Catalog.java | 2 +- .../org/apache/doris/qe/StmtExecutor.java | 36 ++++++++++++++- 3 files changed, 58 insertions(+), 24 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java index a5d17632f6..235e497e80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java @@ -21,28 +21,42 @@ import org.apache.doris.common.ErrorCode; import org.apache.doris.common.ErrorReport; import org.apache.doris.common.UserException; +import lombok.Getter; + import java.util.ArrayList; import java.util.List; /** - * Represents a CREATE TABLE AS SELECT (CTAS) statement - * Syntax: - * CREATE TABLE table_name [( column_name_list )] - * opt_engine opt_partition opt_properties KW_AS query_stmt + * Represents a CREATE TABLE AS SELECT (CTAS) statement. + * Syntax: + * CREATE TABLE table_name [( column_name_list )] + * opt_engine opt_partition opt_properties KW_AS query_stmt */ public class CreateTableAsSelectStmt extends DdlStmt { + + @Getter private final CreateTableStmt createTableStmt; + + @Getter private final List columnNames; + + @Getter private QueryStmt queryStmt; - - public CreateTableAsSelectStmt(CreateTableStmt createTableStmt, - List columnNames, QueryStmt queryStmt) { + + @Getter + private final InsertStmt insertStmt; + + protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt, + List columnNames, QueryStmt queryStmt) { this.createTableStmt = createTableStmt; this.columnNames = columnNames; this.queryStmt = queryStmt; - // Insert is not currently supported + this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt.clone()); } - + + /** + * Cannot analyze insertStmt because the table has not been created yet. + */ @Override public void analyze(Analyzer analyzer) throws UserException { // first: we analyze queryStmt before create table. @@ -63,16 +77,4 @@ public class CreateTableAsSelectStmt extends DdlStmt { ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NUMBER_NOT_MATCH); } } - - public CreateTableStmt getCreateTableStmt() { - return createTableStmt; - } - - public List getColumnNames() { - return columnNames; - } - - public QueryStmt getQueryStmt() { - return queryStmt; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index df61e2b18a..b64a92a298 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -3146,7 +3146,7 @@ public class Catalog { createTableStmt.analyze(dummyRootAnalyzer); createTable(createTableStmt); } catch (UserException e) { - throw new DdlException("Failed to execute CREATE TABLE AS SELECT Reason: " + e.getMessage()); + throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 7195591a29..b8280cff63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -20,6 +20,7 @@ package org.apache.doris.qe; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.CreateTableAsSelectStmt; import org.apache.doris.analysis.DdlStmt; +import org.apache.doris.analysis.DropTableStmt; import org.apache.doris.analysis.EnterStmt; import org.apache.doris.analysis.ExplainOptions; import org.apache.doris.analysis.ExportStmt; @@ -414,6 +415,8 @@ public class StmtExecutor implements ProfileWriter { handleUseStmt(); } else if (parsedStmt instanceof TransactionStmt) { handleTransactionStmt(); + } else if (parsedStmt instanceof CreateTableAsSelectStmt) { + handleCtasStmt(); } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass try { handleInsertStmt(); @@ -1229,7 +1232,6 @@ public class StmtExecutor implements ProfileWriter { context.getMysqlChannel().reset(); // create plan InsertStmt insertStmt = (InsertStmt) parsedStmt; - if (insertStmt.getQueryStmt().hasOutFileClause()) { throw new DdlException("Not support OUTFILE clause in INSERT statement"); } @@ -1550,6 +1552,37 @@ public class StmtExecutor implements ProfileWriter { context.getCatalog().getExportMgr().addExportJob(exportStmt); } + private void handleCtasStmt() { + CreateTableAsSelectStmt ctasStmt = (CreateTableAsSelectStmt) this.parsedStmt; + try { + // create table + DdlExecutor.execute(context.getCatalog(), ctasStmt); + context.getState().setOk(); + } catch (Exception e) { + // Maybe our bug + LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); + } + // after success create table insert data + if (MysqlStateType.OK.equals(context.getState().getStateType())) { + try { + parsedStmt = ctasStmt.getInsertStmt(); + execute(); + } catch (Exception e) { + LOG.warn("CTAS insert data error, stmt={}", parsedStmt.toSql(), e); + // insert error drop table + DropTableStmt dropTableStmt = new DropTableStmt(true, ctasStmt.getCreateTableStmt().getDbTbl(), true); + try { + DdlExecutor.execute(context.getCatalog(), dropTableStmt); + } catch (Exception ex) { + LOG.warn("CTAS drop table error, stmt={}", parsedStmt.toSql(), ex); + context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, + "Unexpected exception: " + ex.getMessage()); + } + } + } + } + public Data.PQueryStatistics getQueryStatisticsForAuditLog() { if (statisticsForAuditLog == null) { statisticsForAuditLog = Data.PQueryStatistics.newBuilder(); @@ -1573,4 +1606,3 @@ public class StmtExecutor implements ProfileWriter { return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList()); } } -