From bbf502dfcfd6d6d81d536498563aa9b520b5a675 Mon Sep 17 00:00:00 2001 From: Calvin Kirs Date: Thu, 23 May 2024 22:13:49 +0800 Subject: [PATCH] [fix](create-table)The CREATE TABLE IF NOT EXISTS AS SELECT statement should refrain from performing any INSERT operations if the table already exists (#35210) --- .../analysis/CreateTableAsSelectStmt.java | 8 ++ .../java/org/apache/doris/catalog/Env.java | 6 +- .../apache/doris/datasource/CatalogIf.java | 6 +- .../doris/datasource/ExternalCatalog.java | 6 +- .../doris/datasource/InternalCatalog.java | 99 ++++++++++--------- .../datasource/hive/HiveMetadataOps.java | 5 +- .../iceberg/IcebergMetadataOps.java | 5 +- .../operations/ExternalMetadataOps.java | 3 +- .../plans/commands/CreateTableCommand.java | 4 +- .../org/apache/doris/qe/StmtExecutor.java | 3 + ...reate_table_if_not_exists_as_select.groovy | 97 ++++++++++++++++++ 11 files changed, 183 insertions(+), 59 deletions(-) create mode 100644 regression-test/suites/table_p0/test_create_table_if_not_exists_as_select.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java index c98fa9414a..9315817e69 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java @@ -30,6 +30,7 @@ import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import lombok.Getter; +import lombok.Setter; import java.util.ArrayList; import java.util.List; @@ -54,6 +55,13 @@ public class CreateTableAsSelectStmt extends DdlStmt { @Getter private final InsertStmt insertStmt; + /** + * If the table has already exists, set this flag to true. + */ + @Setter + @Getter + private boolean tableHasExists = false; + protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt, List columnNames, QueryStmt queryStmt) { this.createTableStmt = createTableStmt; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 5dffb93121..81a8b682c7 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -3101,11 +3101,13 @@ public class Env { * 9. create tablet in BE * 10. add this table to FE's meta * 11. add this table to ColocateGroup if necessary + * @return if CreateTableStmt.isIfNotExists is true, return true if table already exists + * otherwise return false */ - public void createTable(CreateTableStmt stmt) throws UserException { + public boolean createTable(CreateTableStmt stmt) throws UserException { CatalogIf catalogIf = catalogMgr.getCatalogOrException(stmt.getCatalogName(), catalog -> new DdlException(("Unknown catalog " + catalog))); - catalogIf.createTable(stmt); + return catalogIf.createTable(stmt); } public void createTableLike(CreateTableLikeStmt stmt) throws DdlException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java index 072597deb5..ceee1a6815 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java @@ -187,7 +187,11 @@ public interface CatalogIf { void dropDb(DropDbStmt stmt) throws DdlException; - void createTable(CreateTableStmt stmt) throws UserException; + /** + * @return if org.apache.doris.analysis.CreateTableStmt.ifNotExists is true, return true if table exists, + * return false otherwise + */ + boolean createTable(CreateTableStmt stmt) throws UserException; void dropTable(DropTableStmt stmt) throws DdlException; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index 2f8186db72..ad2f53c7ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -723,14 +723,14 @@ public abstract class ExternalCatalog } @Override - public void createTable(CreateTableStmt stmt) throws UserException { + public boolean createTable(CreateTableStmt stmt) throws UserException { makeSureInitialized(); if (metadataOps == null) { LOG.warn("createTable not implemented"); - return; + return false; } try { - metadataOps.createTable(stmt); + return metadataOps.createTable(stmt); } catch (Exception e) { LOG.warn("Failed to create a table.", e); throw e; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index 41ef985f17..5d06378ba9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1103,7 +1103,7 @@ public class InternalCatalog implements CatalogIf { * 10. add this table to FE's meta * 11. add this table to ColocateGroup if necessary */ - public void createTable(CreateTableStmt stmt) throws UserException { + public boolean createTable(CreateTableStmt stmt) throws UserException { String engineName = stmt.getEngineName(); String dbName = stmt.getDbName(); String tableName = stmt.getTableName(); @@ -1128,37 +1128,40 @@ public class InternalCatalog implements CatalogIf { if (db.getTable(tableName).isPresent()) { if (stmt.isSetIfNotExists()) { LOG.info("create table[{}] which already exists", tableName); - return; + return true; } else { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } } if (engineName.equals("olap")) { - createOlapTable(db, stmt); - return; - } else if (engineName.equals("odbc")) { - createOdbcTable(db, stmt); - return; - } else if (engineName.equals("mysql")) { - createMysqlTable(db, stmt); - return; - } else if (engineName.equals("broker")) { - createBrokerTable(db, stmt); - return; - } else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) { - createEsTable(db, stmt); - return; - } else if (engineName.equalsIgnoreCase("hive")) { + return createOlapTable(db, stmt); + } + if (engineName.equals("odbc")) { + return createOdbcTable(db, stmt); + } + if (engineName.equals("mysql")) { + return createMysqlTable(db, stmt); + } + if (engineName.equals("broker")) { + return createBrokerTable(db, stmt); + } + if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) { + return createEsTable(db, stmt); + } + if (engineName.equalsIgnoreCase("hive")) { // should use hive catalog to create external hive table throw new UserException("Cannot create hive table in internal catalog, should switch to hive catalog."); - } else if (engineName.equalsIgnoreCase("jdbc")) { - createJdbcTable(db, stmt); - return; + } + if (engineName.equalsIgnoreCase("jdbc")) { + return createJdbcTable(db, stmt); + } else { ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName); } + Preconditions.checkState(false); + return false; } public void createTableLike(CreateTableLikeStmt stmt) throws DdlException { @@ -1316,7 +1319,8 @@ public class InternalCatalog implements CatalogIf { } Analyzer dummyRootAnalyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get()); createTableStmt.analyze(dummyRootAnalyzer); - createTable(createTableStmt); + boolean tableHasExists = createTable(createTableStmt); + stmt.setTableHasExists(tableHasExists); } catch (UserException e) { throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage()); } @@ -2030,12 +2034,12 @@ public class InternalCatalog implements CatalogIf { } // Create olap table and related base index synchronously. - private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException { + private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserException { String tableName = stmt.getTableName(); if (LOG.isDebugEnabled()) { LOG.debug("begin create olap table: {}", tableName); } - + boolean tableHasExist = false; BinlogConfig dbBinlogConfig; db.readLock(); try { @@ -2794,9 +2798,10 @@ public class InternalCatalog implements CatalogIf { throw t; } } + return tableHasExist; } - private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException { + private boolean createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException { String tableName = stmt.getTableName(); List columns = stmt.getColumns(); @@ -2804,26 +2809,22 @@ public class InternalCatalog implements CatalogIf { long tableId = Env.getCurrentEnv().getNextId(); MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties()); mysqlTable.setComment(stmt.getComment()); - if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); - } - LOG.info("successfully create table[{}-{}]", tableName, tableId); + Pair result = db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()); + return checkCreateTableResult(tableName, tableId, result); } - private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException { + private boolean createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException { String tableName = stmt.getTableName(); List columns = stmt.getColumns(); long tableId = Env.getCurrentEnv().getNextId(); OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties()); odbcTable.setComment(stmt.getComment()); - if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); - } - LOG.info("successfully create table[{}-{}]", tableName, tableId); + Pair result = db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()); + return checkCreateTableResult(tableName, tableId, result); } - private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException { + private boolean createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException { String tableName = stmt.getTableName(); // validate props to get column from es. @@ -2856,14 +2857,11 @@ public class InternalCatalog implements CatalogIf { esTable.setId(tableId); esTable.setComment(stmt.getComment()); esTable.syncTableMetaData(); - if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); - } - LOG.info("successfully create table{} with id {}", tableName, tableId); - return esTable; + Pair result = db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()); + return checkCreateTableResult(tableName, tableId, result); } - private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException { + private boolean createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException { String tableName = stmt.getTableName(); List columns = stmt.getColumns(); @@ -2872,11 +2870,8 @@ public class InternalCatalog implements CatalogIf { BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties()); brokerTable.setComment(stmt.getComment()); brokerTable.setBrokerProperties(stmt.getExtProperties()); - - if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) { - ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); - } - LOG.info("successfully create table[{}-{}]", tableName, tableId); + Pair result = db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()); + return checkCreateTableResult(tableName, tableId, result); } private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException { @@ -2903,7 +2898,7 @@ public class InternalCatalog implements CatalogIf { LOG.info("successfully create table[{}-{}]", tableName, tableId); } - private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException { + private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException { String tableName = stmt.getTableName(); List columns = stmt.getColumns(); @@ -2912,10 +2907,20 @@ public class InternalCatalog implements CatalogIf { JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, stmt.getProperties()); jdbcTable.setComment(stmt.getComment()); // check table if exists - if (!db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()).first) { + Pair result = db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()); + return checkCreateTableResult(tableName, tableId, result); + } + + private boolean checkCreateTableResult(String tableName, long tableId, Pair result) + throws DdlException { + if (Boolean.FALSE.equals(result.first)) { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } + if (Boolean.TRUE.equals(result.second)) { + return true; + } LOG.info("successfully create table[{}-{}]", tableName, tableId); + return false; } @VisibleForTesting diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java index 1cf6595bba..72f1932904 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java @@ -147,7 +147,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { } @Override - public void createTable(CreateTableStmt stmt) throws UserException { + public boolean createTable(CreateTableStmt stmt) throws UserException { String dbName = stmt.getDbName(); String tblName = stmt.getTableName(); ExternalDatabase db = catalog.getDbNullable(dbName); @@ -157,7 +157,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { if (tableExist(dbName, tblName)) { if (stmt.isSetIfNotExists()) { LOG.info("create table[{}] which already exists", tblName); - return; + return true; } else { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tblName); } @@ -225,6 +225,7 @@ public class HiveMetadataOps implements ExternalMetadataOps { } catch (Exception e) { throw new UserException(e.getMessage(), e); } + return false; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java index bd1dcdd3e6..c59db7b4b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java @@ -119,7 +119,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps { } @Override - public void createTable(CreateTableStmt stmt) throws UserException { + public boolean createTable(CreateTableStmt stmt) throws UserException { String dbName = stmt.getDbName(); ExternalDatabase db = dorisCatalog.getDbNullable(dbName); if (db == null) { @@ -129,7 +129,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps { if (tableExist(dbName, tableName)) { if (stmt.isSetIfNotExists()) { LOG.info("create table[{}] which already exists", tableName); - return; + return true; } else { ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName); } @@ -147,6 +147,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps { PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema); catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties); db.setUnInitialized(true); + return false; } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java index cb6ba35c14..b603b7a3ca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java @@ -48,9 +48,10 @@ public interface ExternalMetadataOps { /** * * @param stmt + * @return if set isExists is true, return true if table exists, otherwise return false * @throws UserException */ - void createTable(CreateTableStmt stmt) throws UserException; + boolean createTable(CreateTableStmt stmt) throws UserException; /** * diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java index 613c29136d..e70ad87aa5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateTableCommand.java @@ -151,7 +151,9 @@ public class CreateTableCommand extends Command implements ForwardWithSync { ctx.queryId(), createTableInfo.getTableName()); } try { - Env.getCurrentEnv().createTable(createTableStmt); + if (Env.getCurrentEnv().createTable(createTableStmt)) { + return; + } } catch (Exception e) { throw new AnalysisException(e.getMessage(), e.getCause()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index c2b0e9dd44..c90ca0b5ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -2694,6 +2694,9 @@ public class StmtExecutor { context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); return; } + if (ctasStmt.isTableHasExists()) { + return; + } // after success create table insert data try { parsedStmt = ctasStmt.getInsertStmt(); diff --git a/regression-test/suites/table_p0/test_create_table_if_not_exists_as_select.groovy b/regression-test/suites/table_p0/test_create_table_if_not_exists_as_select.groovy new file mode 100644 index 0000000000..0cd5cf335c --- /dev/null +++ b/regression-test/suites/table_p0/test_create_table_if_not_exists_as_select.groovy @@ -0,0 +1,97 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.junit.Assert; + +suite("test_create_table_if_not_exists_as_select") { + def base_table_name = "test_create_table_if_not_exists_as_select_base_table" + def table_name = "test_create_table_if_not_exists_as_select_table" + sql """drop table if exists `${base_table_name}`""" + sql """drop table if exists `${table_name}`""" + sql """SET enable_fallback_to_original_planner=false""" + + sql """ + CREATE TABLE `${base_table_name}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703_3000 VALUES [('2017-03-01'), ('2017-04-01')), + PARTITION p201704_all VALUES [('2017-04-01'), ('2017-05-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + try{ + sql """ + CREATE TABLE `${base_table_name}` ( + `user_id` LARGEINT NOT NULL COMMENT '\"用户id\"', + `date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"', + `num` SMALLINT NOT NULL COMMENT '\"数量\"' + ) ENGINE=OLAP + DUPLICATE KEY(`user_id`, `date`, `num`) + COMMENT 'OLAP' + PARTITION BY RANGE(`date`) + (PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')), + PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')), + PARTITION p201703_3000 VALUES [('2017-03-01'), ('2017-04-01')), + PARTITION p201704_all VALUES [('2017-04-01'), ('2017-05-01'))) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 2 + PROPERTIES ('replication_num' = '1') ; + """ + }catch (Exception e){ + println(e.getMessage()) + Assert.assertTrue(e.getMessage().contains("Table 'test_create_table_if_not_exists_as_select_base_table' already exists")) + } + sql """ + insert into ${base_table_name} values(1,"2017-01-15",1); + """ + + sql """ + create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name} + """ + def firstExecuteCount = sql """select count(*) from ${table_name}""" + assertEquals(1, firstExecuteCount[0][0]); + sql """ + create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name} + """ + def secondExecuteCount = sql """select count(*) from ${table_name}""" + assertEquals(1, secondExecuteCount[0][0]); + sql """ + SET enable_nereids_planner=false; + """ + sql """drop table if exists `${table_name}`""" + sql """ + create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name} + """ + def originalFirstExecuteCount = sql """select count(*) from ${table_name}""" + assertEquals(1, originalFirstExecuteCount[0][0]); + sql """ + create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name} + """ + def originalSecondExecuteCount = sql """select count(*) from ${table_name}""" + assertEquals(1, originalSecondExecuteCount[0][0]); + try{ + sql """ create table ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name} """ + }catch (Exception e){ + Assert.assertTrue(e.getMessage().contains("Table 'test_create_table_if_not_exists_as_select_table' already exists")); + } +}