[fix](create-table)The CREATE TABLE IF NOT EXISTS AS SELECT statement should refrain from performing any INSERT operations if the table already exists (#35210)

This commit is contained in:
Calvin Kirs
2024-05-23 22:13:49 +08:00
committed by yiguolei
parent 708b5b548c
commit bbf502dfcf
11 changed files with 183 additions and 59 deletions

View File

@ -30,6 +30,7 @@ import org.apache.doris.qe.ConnectContext;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.Setter;
import java.util.ArrayList;
import java.util.List;
@ -54,6 +55,13 @@ public class CreateTableAsSelectStmt extends DdlStmt {
@Getter
private final InsertStmt insertStmt;
/**
* If the table has already exists, set this flag to true.
*/
@Setter
@Getter
private boolean tableHasExists = false;
protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
List<String> columnNames, QueryStmt queryStmt) {
this.createTableStmt = createTableStmt;

View File

@ -3101,11 +3101,13 @@ public class Env {
* 9. create tablet in BE
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
* @return if CreateTableStmt.isIfNotExists is true, return true if table already exists
* otherwise return false
*/
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(stmt.getCatalogName(),
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.createTable(stmt);
return catalogIf.createTable(stmt);
}
public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {

View File

@ -187,7 +187,11 @@ public interface CatalogIf<T extends DatabaseIf> {
void dropDb(DropDbStmt stmt) throws DdlException;
void createTable(CreateTableStmt stmt) throws UserException;
/**
* @return if org.apache.doris.analysis.CreateTableStmt.ifNotExists is true, return true if table exists,
* return false otherwise
*/
boolean createTable(CreateTableStmt stmt) throws UserException;
void dropTable(DropTableStmt stmt) throws DdlException;
}

View File

@ -723,14 +723,14 @@ public abstract class ExternalCatalog
}
@Override
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
makeSureInitialized();
if (metadataOps == null) {
LOG.warn("createTable not implemented");
return;
return false;
}
try {
metadataOps.createTable(stmt);
return metadataOps.createTable(stmt);
} catch (Exception e) {
LOG.warn("Failed to create a table.", e);
throw e;

View File

@ -1103,7 +1103,7 @@ public class InternalCatalog implements CatalogIf<Database> {
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
*/
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String engineName = stmt.getEngineName();
String dbName = stmt.getDbName();
String tableName = stmt.getTableName();
@ -1128,37 +1128,40 @@ public class InternalCatalog implements CatalogIf<Database> {
if (db.getTable(tableName).isPresent()) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
if (engineName.equals("olap")) {
createOlapTable(db, stmt);
return;
} else if (engineName.equals("odbc")) {
createOdbcTable(db, stmt);
return;
} else if (engineName.equals("mysql")) {
createMysqlTable(db, stmt);
return;
} else if (engineName.equals("broker")) {
createBrokerTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
createEsTable(db, stmt);
return;
} else if (engineName.equalsIgnoreCase("hive")) {
return createOlapTable(db, stmt);
}
if (engineName.equals("odbc")) {
return createOdbcTable(db, stmt);
}
if (engineName.equals("mysql")) {
return createMysqlTable(db, stmt);
}
if (engineName.equals("broker")) {
return createBrokerTable(db, stmt);
}
if (engineName.equalsIgnoreCase("elasticsearch") || engineName.equalsIgnoreCase("es")) {
return createEsTable(db, stmt);
}
if (engineName.equalsIgnoreCase("hive")) {
// should use hive catalog to create external hive table
throw new UserException("Cannot create hive table in internal catalog, should switch to hive catalog.");
} else if (engineName.equalsIgnoreCase("jdbc")) {
createJdbcTable(db, stmt);
return;
}
if (engineName.equalsIgnoreCase("jdbc")) {
return createJdbcTable(db, stmt);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_UNKNOWN_STORAGE_ENGINE, engineName);
}
Preconditions.checkState(false);
return false;
}
public void createTableLike(CreateTableLikeStmt stmt) throws DdlException {
@ -1316,7 +1319,8 @@ public class InternalCatalog implements CatalogIf<Database> {
}
Analyzer dummyRootAnalyzer = new Analyzer(Env.getCurrentEnv(), ConnectContext.get());
createTableStmt.analyze(dummyRootAnalyzer);
createTable(createTableStmt);
boolean tableHasExists = createTable(createTableStmt);
stmt.setTableHasExists(tableHasExists);
} catch (UserException e) {
throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
}
@ -2030,12 +2034,12 @@ public class InternalCatalog implements CatalogIf<Database> {
}
// Create olap table and related base index synchronously.
private void createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
private boolean createOlapTable(Database db, CreateTableStmt stmt) throws UserException {
String tableName = stmt.getTableName();
if (LOG.isDebugEnabled()) {
LOG.debug("begin create olap table: {}", tableName);
}
boolean tableHasExist = false;
BinlogConfig dbBinlogConfig;
db.readLock();
try {
@ -2794,9 +2798,10 @@ public class InternalCatalog implements CatalogIf<Database> {
throw t;
}
}
return tableHasExist;
}
private void createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createMysqlTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
@ -2804,26 +2809,22 @@ public class InternalCatalog implements CatalogIf<Database> {
long tableId = Env.getCurrentEnv().getNextId();
MysqlTable mysqlTable = new MysqlTable(tableId, tableName, columns, stmt.getProperties());
mysqlTable.setComment(stmt.getComment());
if (!db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(mysqlTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}
private void createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createOdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
long tableId = Env.getCurrentEnv().getNextId();
OdbcTable odbcTable = new OdbcTable(tableId, tableName, columns, stmt.getProperties());
odbcTable.setComment(stmt.getComment());
if (!db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(odbcTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}
private Table createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException {
private boolean createEsTable(Database db, CreateTableStmt stmt) throws DdlException, AnalysisException {
String tableName = stmt.getTableName();
// validate props to get column from es.
@ -2856,14 +2857,11 @@ public class InternalCatalog implements CatalogIf<Database> {
esTable.setId(tableId);
esTable.setComment(stmt.getComment());
esTable.syncTableMetaData();
if (!db.createTableWithLock(esTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table{} with id {}", tableName, tableId);
return esTable;
Pair<Boolean, Boolean> result = db.createTableWithLock(esTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}
private void createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createBrokerTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
@ -2872,11 +2870,8 @@ public class InternalCatalog implements CatalogIf<Database> {
BrokerTable brokerTable = new BrokerTable(tableId, tableName, columns, stmt.getProperties());
brokerTable.setComment(stmt.getComment());
brokerTable.setBrokerProperties(stmt.getExtProperties());
if (!db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists()).first) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
Pair<Boolean, Boolean> result = db.createTableWithLock(brokerTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}
private void createHiveTable(Database db, CreateTableStmt stmt) throws DdlException {
@ -2903,7 +2898,7 @@ public class InternalCatalog implements CatalogIf<Database> {
LOG.info("successfully create table[{}-{}]", tableName, tableId);
}
private void createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
private boolean createJdbcTable(Database db, CreateTableStmt stmt) throws DdlException {
String tableName = stmt.getTableName();
List<Column> columns = stmt.getColumns();
@ -2912,10 +2907,20 @@ public class InternalCatalog implements CatalogIf<Database> {
JdbcTable jdbcTable = new JdbcTable(tableId, tableName, columns, stmt.getProperties());
jdbcTable.setComment(stmt.getComment());
// check table if exists
if (!db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists()).first) {
Pair<Boolean, Boolean> result = db.createTableWithLock(jdbcTable, false, stmt.isSetIfNotExists());
return checkCreateTableResult(tableName, tableId, result);
}
private boolean checkCreateTableResult(String tableName, long tableId, Pair<Boolean, Boolean> result)
throws DdlException {
if (Boolean.FALSE.equals(result.first)) {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
if (Boolean.TRUE.equals(result.second)) {
return true;
}
LOG.info("successfully create table[{}-{}]", tableName, tableId);
return false;
}
@VisibleForTesting

View File

@ -147,7 +147,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
}
@Override
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
String tblName = stmt.getTableName();
ExternalDatabase<?> db = catalog.getDbNullable(dbName);
@ -157,7 +157,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
if (tableExist(dbName, tblName)) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tblName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tblName);
}
@ -225,6 +225,7 @@ public class HiveMetadataOps implements ExternalMetadataOps {
} catch (Exception e) {
throw new UserException(e.getMessage(), e);
}
return false;
}
@Override

View File

@ -119,7 +119,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
}
@Override
public void createTable(CreateTableStmt stmt) throws UserException {
public boolean createTable(CreateTableStmt stmt) throws UserException {
String dbName = stmt.getDbName();
ExternalDatabase<?> db = dorisCatalog.getDbNullable(dbName);
if (db == null) {
@ -129,7 +129,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
if (tableExist(dbName, tableName)) {
if (stmt.isSetIfNotExists()) {
LOG.info("create table[{}] which already exists", tableName);
return;
return true;
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
@ -147,6 +147,7 @@ public class IcebergMetadataOps implements ExternalMetadataOps {
PartitionSpec partitionSpec = IcebergUtils.solveIcebergPartitionSpec(stmt.getPartitionDesc(), schema);
catalog.createTable(TableIdentifier.of(dbName, tableName), schema, partitionSpec, properties);
db.setUnInitialized(true);
return false;
}
@Override

View File

@ -48,9 +48,10 @@ public interface ExternalMetadataOps {
/**
*
* @param stmt
* @return if set isExists is true, return true if table exists, otherwise return false
* @throws UserException
*/
void createTable(CreateTableStmt stmt) throws UserException;
boolean createTable(CreateTableStmt stmt) throws UserException;
/**
*

View File

@ -151,7 +151,9 @@ public class CreateTableCommand extends Command implements ForwardWithSync {
ctx.queryId(), createTableInfo.getTableName());
}
try {
Env.getCurrentEnv().createTable(createTableStmt);
if (Env.getCurrentEnv().createTable(createTableStmt)) {
return;
}
} catch (Exception e) {
throw new AnalysisException(e.getMessage(), e.getCause());
}

View File

@ -2694,6 +2694,9 @@ public class StmtExecutor {
context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
return;
}
if (ctasStmt.isTableHasExists()) {
return;
}
// after success create table insert data
try {
parsedStmt = ctasStmt.getInsertStmt();

View File

@ -0,0 +1,97 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import org.junit.Assert;
suite("test_create_table_if_not_exists_as_select") {
def base_table_name = "test_create_table_if_not_exists_as_select_base_table"
def table_name = "test_create_table_if_not_exists_as_select_table"
sql """drop table if exists `${base_table_name}`"""
sql """drop table if exists `${table_name}`"""
sql """SET enable_fallback_to_original_planner=false"""
sql """
CREATE TABLE `${base_table_name}` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
`date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
`num` SMALLINT NOT NULL COMMENT '\"数量\"'
) ENGINE=OLAP
DUPLICATE KEY(`user_id`, `date`, `num`)
COMMENT 'OLAP'
PARTITION BY RANGE(`date`)
(PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
PARTITION p201703_3000 VALUES [('2017-03-01'), ('2017-04-01')),
PARTITION p201704_all VALUES [('2017-04-01'), ('2017-05-01')))
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
PROPERTIES ('replication_num' = '1') ;
"""
try{
sql """
CREATE TABLE `${base_table_name}` (
`user_id` LARGEINT NOT NULL COMMENT '\"用户id\"',
`date` DATE NOT NULL COMMENT '\"数据灌入日期时间\"',
`num` SMALLINT NOT NULL COMMENT '\"数量\"'
) ENGINE=OLAP
DUPLICATE KEY(`user_id`, `date`, `num`)
COMMENT 'OLAP'
PARTITION BY RANGE(`date`)
(PARTITION p201701_1000 VALUES [('0000-01-01'), ('2017-02-01')),
PARTITION p201702_2000 VALUES [('2017-02-01'), ('2017-03-01')),
PARTITION p201703_3000 VALUES [('2017-03-01'), ('2017-04-01')),
PARTITION p201704_all VALUES [('2017-04-01'), ('2017-05-01')))
DISTRIBUTED BY HASH(`user_id`) BUCKETS 2
PROPERTIES ('replication_num' = '1') ;
"""
}catch (Exception e){
println(e.getMessage())
Assert.assertTrue(e.getMessage().contains("Table 'test_create_table_if_not_exists_as_select_base_table' already exists"))
}
sql """
insert into ${base_table_name} values(1,"2017-01-15",1);
"""
sql """
create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name}
"""
def firstExecuteCount = sql """select count(*) from ${table_name}"""
assertEquals(1, firstExecuteCount[0][0]);
sql """
create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name}
"""
def secondExecuteCount = sql """select count(*) from ${table_name}"""
assertEquals(1, secondExecuteCount[0][0]);
sql """
SET enable_nereids_planner=false;
"""
sql """drop table if exists `${table_name}`"""
sql """
create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name}
"""
def originalFirstExecuteCount = sql """select count(*) from ${table_name}"""
assertEquals(1, originalFirstExecuteCount[0][0]);
sql """
create table if not exists ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name}
"""
def originalSecondExecuteCount = sql """select count(*) from ${table_name}"""
assertEquals(1, originalSecondExecuteCount[0][0]);
try{
sql """ create table ${table_name} PROPERTIES("replication_num"="1") as select * from ${base_table_name} """
}catch (Exception e){
Assert.assertTrue(e.getMessage().contains("Table 'test_create_table_if_not_exists_as_select_table' already exists"));
}
}