[Feature] CTAS support insert data (#9271)

This commit is contained in:
Stalary
2022-05-07 08:51:54 +08:00
committed by GitHub
parent 659417c5c9
commit 6f0c8fb698
3 changed files with 58 additions and 24 deletions

View File

@ -21,28 +21,42 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
/**
* Represents a CREATE TABLE AS SELECT (CTAS) statement
* Syntax:
* CREATE TABLE table_name [( column_name_list )]
* opt_engine opt_partition opt_properties KW_AS query_stmt
* Represents a CREATE TABLE AS SELECT (CTAS) statement.
* Syntax:
* CREATE TABLE table_name [( column_name_list )]
* opt_engine opt_partition opt_properties KW_AS query_stmt
*/
public class CreateTableAsSelectStmt extends DdlStmt {
@Getter
private final CreateTableStmt createTableStmt;
@Getter
private final List<String> columnNames;
@Getter
private QueryStmt queryStmt;
public CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
List<String> columnNames, QueryStmt queryStmt) {
@Getter
private final InsertStmt insertStmt;
protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
List<String> columnNames, QueryStmt queryStmt) {
this.createTableStmt = createTableStmt;
this.columnNames = columnNames;
this.queryStmt = queryStmt;
// Insert is not currently supported
this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt.clone());
}
/**
* Cannot analyze insertStmt because the table has not been created yet.
*/
@Override
public void analyze(Analyzer analyzer) throws UserException {
// first: we analyze queryStmt before create table.
@ -63,16 +77,4 @@ public class CreateTableAsSelectStmt extends DdlStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NUMBER_NOT_MATCH);
}
}
public CreateTableStmt getCreateTableStmt() {
return createTableStmt;
}
public List<String> getColumnNames() {
return columnNames;
}
public QueryStmt getQueryStmt() {
return queryStmt;
}
}

View File

@ -3146,7 +3146,7 @@ public class Catalog {
createTableStmt.analyze(dummyRootAnalyzer);
createTable(createTableStmt);
} catch (UserException e) {
throw new DdlException("Failed to execute CREATE TABLE AS SELECT Reason: " + e.getMessage());
throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
}
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.EnterStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.ExportStmt;
@ -414,6 +415,8 @@ public class StmtExecutor implements ProfileWriter {
handleUseStmt();
} else if (parsedStmt instanceof TransactionStmt) {
handleTransactionStmt();
} else if (parsedStmt instanceof CreateTableAsSelectStmt) {
handleCtasStmt();
} else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
try {
handleInsertStmt();
@ -1229,7 +1232,6 @@ public class StmtExecutor implements ProfileWriter {
context.getMysqlChannel().reset();
// create plan
InsertStmt insertStmt = (InsertStmt) parsedStmt;
if (insertStmt.getQueryStmt().hasOutFileClause()) {
throw new DdlException("Not support OUTFILE clause in INSERT statement");
}
@ -1550,6 +1552,37 @@ public class StmtExecutor implements ProfileWriter {
context.getCatalog().getExportMgr().addExportJob(exportStmt);
}
private void handleCtasStmt() {
CreateTableAsSelectStmt ctasStmt = (CreateTableAsSelectStmt) this.parsedStmt;
try {
// create table
DdlExecutor.execute(context.getCatalog(), ctasStmt);
context.getState().setOk();
} catch (Exception e) {
// Maybe our bug
LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
}
// after success create table insert data
if (MysqlStateType.OK.equals(context.getState().getStateType())) {
try {
parsedStmt = ctasStmt.getInsertStmt();
execute();
} catch (Exception e) {
LOG.warn("CTAS insert data error, stmt={}", parsedStmt.toSql(), e);
// insert error drop table
DropTableStmt dropTableStmt = new DropTableStmt(true, ctasStmt.getCreateTableStmt().getDbTbl(), true);
try {
DdlExecutor.execute(context.getCatalog(), dropTableStmt);
} catch (Exception ex) {
LOG.warn("CTAS drop table error, stmt={}", parsedStmt.toSql(), ex);
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
"Unexpected exception: " + ex.getMessage());
}
}
}
}
public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
if (statisticsForAuditLog == null) {
statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
@ -1573,4 +1606,3 @@ public class StmtExecutor implements ProfileWriter {
return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList());
}
}