[feature](JdbcExternalCatalog) support insert data in JdbcExternalCatalog (#16271)
This commit is contained in:
@ -31,6 +31,8 @@ import org.apache.doris.catalog.Partition;
|
||||
import org.apache.doris.catalog.PartitionType;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.catalog.TableIf;
|
||||
import org.apache.doris.catalog.external.JdbcExternalDatabase;
|
||||
import org.apache.doris.catalog.external.JdbcExternalTable;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
import org.apache.doris.common.DdlException;
|
||||
import org.apache.doris.common.ErrorCode;
|
||||
@ -39,6 +41,8 @@ import org.apache.doris.common.Pair;
|
||||
import org.apache.doris.common.UserException;
|
||||
import org.apache.doris.common.util.DebugUtil;
|
||||
import org.apache.doris.common.util.Util;
|
||||
import org.apache.doris.datasource.ExternalCatalog;
|
||||
import org.apache.doris.datasource.JdbcExternalCatalog;
|
||||
import org.apache.doris.mysql.privilege.PrivPredicate;
|
||||
import org.apache.doris.planner.DataPartition;
|
||||
import org.apache.doris.planner.DataSink;
|
||||
@ -110,7 +114,7 @@ public class InsertStmt extends DdlStmt {
|
||||
|
||||
private Table targetTable;
|
||||
|
||||
private Database db;
|
||||
private DatabaseIf db;
|
||||
private long transactionId;
|
||||
|
||||
// we need a new TupleDesc for olap table.
|
||||
@ -191,12 +195,15 @@ public class InsertStmt extends DdlStmt {
|
||||
// get dbs of statement
|
||||
queryStmt.getTables(analyzer, false, tableMap, parentViewNameSet);
|
||||
tblName.analyze(analyzer);
|
||||
// disallow external catalog
|
||||
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
|
||||
// disallow external catalog except JdbcExternalCatalog
|
||||
if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
|
||||
&& !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) {
|
||||
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
|
||||
}
|
||||
String dbName = tblName.getDb();
|
||||
String tableName = tblName.getTbl();
|
||||
// check exist
|
||||
DatabaseIf db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(dbName);
|
||||
DatabaseIf db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(dbName);
|
||||
TableIf table = db.getTableOrAnalysisException(tblName.getTbl());
|
||||
|
||||
// check access
|
||||
@ -247,7 +254,7 @@ public class InsertStmt extends DdlStmt {
|
||||
return dataSink;
|
||||
}
|
||||
|
||||
public Database getDbObj() {
|
||||
public DatabaseIf getDbObj() {
|
||||
return db;
|
||||
}
|
||||
|
||||
@ -261,8 +268,11 @@ public class InsertStmt extends DdlStmt {
|
||||
|
||||
if (targetTable == null) {
|
||||
tblName.analyze(analyzer);
|
||||
// disallow external catalog
|
||||
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
|
||||
// disallow external catalog except JdbcExternalCatalog
|
||||
if (analyzer.getEnv().getCurrentCatalog() instanceof ExternalCatalog
|
||||
&& !(analyzer.getEnv().getCurrentCatalog() instanceof JdbcExternalCatalog)) {
|
||||
Util.prohibitExternalCatalog(tblName.getCtl(), this.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
// Check privilege
|
||||
@ -292,8 +302,7 @@ public class InsertStmt extends DdlStmt {
|
||||
// create data sink
|
||||
createDataSink();
|
||||
|
||||
db = analyzer.getEnv().getInternalCatalog().getDbOrAnalysisException(tblName.getDb());
|
||||
|
||||
db = analyzer.getEnv().getCatalogMgr().getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
|
||||
// create label and begin transaction
|
||||
long timeoutSecond = ConnectContext.get().getSessionVariable().getQueryTimeoutS();
|
||||
if (Strings.isNullOrEmpty(label)) {
|
||||
@ -322,8 +331,16 @@ public class InsertStmt extends DdlStmt {
|
||||
private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException {
|
||||
// Get table
|
||||
if (targetTable == null) {
|
||||
DatabaseIf db = Env.getCurrentInternalCatalog().getDbOrAnalysisException(tblName.getDb());
|
||||
targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl());
|
||||
DatabaseIf db = analyzer.getEnv().getCatalogMgr()
|
||||
.getCatalog(tblName.getCtl()).getDbOrAnalysisException(tblName.getDb());
|
||||
if (db instanceof Database) {
|
||||
targetTable = (Table) db.getTableOrAnalysisException(tblName.getTbl());
|
||||
} else if (db instanceof JdbcExternalDatabase) {
|
||||
JdbcExternalTable jdbcTable = (JdbcExternalTable) db.getTableOrAnalysisException(tblName.getTbl());
|
||||
targetTable = jdbcTable.getJdbcTable();
|
||||
} else {
|
||||
throw new AnalysisException("Not support insert target table.");
|
||||
}
|
||||
}
|
||||
|
||||
if (targetTable instanceof OlapTable) {
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.MaterializedIndex;
|
||||
import org.apache.doris.catalog.OlapTable;
|
||||
@ -664,7 +665,7 @@ public class DatabaseTransactionMgr {
|
||||
LOG.info("transaction:[{}] successfully committed", transactionState);
|
||||
}
|
||||
|
||||
public boolean waitForTransactionFinished(Database db, long transactionId, long timeoutMillis)
|
||||
public boolean waitForTransactionFinished(DatabaseIf db, long transactionId, long timeoutMillis)
|
||||
throws TransactionCommitFailedException {
|
||||
TransactionState transactionState = null;
|
||||
readLock();
|
||||
|
||||
@ -18,6 +18,7 @@
|
||||
package org.apache.doris.transaction;
|
||||
|
||||
import org.apache.doris.catalog.Database;
|
||||
import org.apache.doris.catalog.DatabaseIf;
|
||||
import org.apache.doris.catalog.Env;
|
||||
import org.apache.doris.catalog.Table;
|
||||
import org.apache.doris.common.AnalysisException;
|
||||
@ -243,13 +244,13 @@ public class GlobalTransactionMgr implements Writable {
|
||||
dbTransactionMgr.commitTransaction(null, transactionId, null, null, true);
|
||||
}
|
||||
|
||||
public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId,
|
||||
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
|
||||
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis)
|
||||
throws UserException {
|
||||
return commitAndPublishTransaction(db, tableList, transactionId, tabletCommitInfos, timeoutMillis, null);
|
||||
}
|
||||
|
||||
public boolean commitAndPublishTransaction(Database db, List<Table> tableList, long transactionId,
|
||||
public boolean commitAndPublishTransaction(DatabaseIf db, List<Table> tableList, long transactionId,
|
||||
List<TabletCommitInfo> tabletCommitInfos, long timeoutMillis,
|
||||
TxnCommitAttachment txnCommitAttachment)
|
||||
throws UserException {
|
||||
|
||||
Reference in New Issue
Block a user