From baf85547aeb7b3857a51b833f3d6c8152991af36 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Fri, 8 Dec 2023 23:06:05 +0800 Subject: [PATCH] [feature](jdbc) support call function to pass sql directly to jdbc catalog #26492 Support a new stmt in Nereids: `CALL EXECUTE_STMT("jdbc", "stmt")` So that we can pass the origin stmt directly to the datasource of a jdbc catalog. show case: ``` mysql> select * from mysql_catalog.db1.tbl1; +------+------+ | k1 | k2 | +------+------+ | 111 | 222 | +------+------+ 1 row in set (0.63 sec) mysql> call execute("mysql_catalog", "insert into db1.tbl1 values(1,'abc')"); Query OK, 0 rows affected (0.01 sec) mysql> select * from mysql_catalog.db1.tbl1; +------+------+ | k1 | k2 | +------+------+ | 111 | 222 | | 1 | abc | +------+------+ 2 rows in set (0.03 sec) mysql> call execute_stmt("mysql_catalog", "delete from db1.tbl1 where k1=111"); Query OK, 0 rows affected (0.01 sec) mysql> select * from mysql_catalog.db1.tbl1; +------+------+ | k1 | k2 | +------+------+ | 1 | abc | +------+------+ 1 row in set (0.03 sec) ``` --- docs/en/docs/lakehouse/multi-catalog/jdbc.md | 170 +++++++++++------- .../docs/lakehouse/multi-catalog/jdbc.md | 170 +++++++++++------- .../org/apache/doris/nereids/DorisLexer.g4 | 1 + .../org/apache/doris/nereids/DorisParser.g4 | 3 + .../datasource/jdbc/JdbcExternalCatalog.java | 9 + .../datasource/jdbc/client/JdbcClient.java | 19 ++ .../nereids/parser/LogicalPlanBuilder.java | 11 ++ .../doris/nereids/trees/plans/PlanType.java | 3 +- .../trees/plans/commands/CallCommand.java | 59 ++++++ .../commands/call/CallExecuteStmtFunc.java | 102 +++++++++++ .../trees/plans/commands/call/CallFunc.java | 42 +++++ .../trees/plans/visitor/CommandVisitor.java | 5 + .../external_table_p0/jdbc/test_jdbc_call.out | 30 ++++ .../jdbc/test_jdbc_call.groovy | 131 ++++++++++++++ 14 files changed, 618 insertions(+), 137 deletions(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallExecuteStmtFunc.java create mode 100644 fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java create mode 100644 regression-test/data/external_table_p0/jdbc/test_jdbc_call.out create mode 100644 regression-test/suites/external_table_p0/jdbc/test_jdbc_call.groovy diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md b/docs/en/docs/lakehouse/multi-catalog/jdbc.md index b31bd7f99f..971eb316eb 100644 --- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md @@ -157,6 +157,108 @@ The transaction mechanism ensures the atomicity of data writing to JDBC External ## Guide +### View the JDBC Catalog + +You can query all Catalogs in the current Doris cluster through SHOW CATALOGS: + +```sql +SHOW CATALOGS; +``` + +Query the creation statement of a Catalog through SHOW CREATE CATALOG: + +```sql +SHOW CREATE CATALOG ; +``` + +### Drop the JDBC Catalog + +A Catalog can be deleted via DROP CATALOG: + +```sql +DROP CATALOG ; +``` + +### Query the JDBC Catalog + +1. Use SWITCH to switch the Catalog in effect for the current session: + + ```sql + SWITCH ; + ``` + +2. Query all libraries under the current Catalog through SHOW DATABASES: + + ```sql + SHOW DATABASES FROM ; + ``` + + ```sql + SHOW DATABASES; + ``` + +3. Use USE to switch the Database that takes effect in the current session: + + ```sql + USE ; + ``` + + Or directly use `USE .;` to switch the Database that takes effect in the current session + +4. Query all tables under the current Catalog through SHOW TABLES: + + ```sql + SHOW TABLES FROM .; + ``` + + ```sql + SHOW TABLES FROM ; + ``` + + ```sql + SHOW TABLES; + ``` + +5. Query the data of a table under the current Catalog through SELECT: + + ```sql + SELECT * FROM ; + ``` + +## SQL Passthrough + +In versions prior to Doris 2.0.3, users could only perform query operations (SELECT) through the JDBC Catalog. +Starting from version Doris 2.0.4, users can perform DDL (Data Definition Language) and DML (Data Manipulation Language) operations on JDBC data sources using the `CALL` command. + +``` +CALL EXECUTE_STMT("catalog_name", "raw_stmt_string"); +``` + +The `EXECUTE_STMT()` procedure involves two parameters: + +- Catalog Name: Currently, only the Jdbc Catalog is supported. +- Execution Statement: Currently, only DDL and DML statements are supported. These statements must use the syntax specific to the JDBC data source. + +``` +CALL EXECUTE_STMT("jdbc_catalog", "insert into db1.tbl1 values(1,2), (3, 4)"); + +CALL EXECUTE_STMT("jdbc_catalog", "delete from db1.tbl1 where k1 = 2"); + +CALL EXECUTE_STMT("jdbc_catalog", "create table db1.tbl2 (k1 int)"); +``` + +### Principles and Limitations + +Through the `CALL EXECUTE_STMT()` command, Doris directly sends the SQL statements written by the user to the JDBC data source associated with the Catalog for execution. Therefore, this operation has the following limitations: + +- The SQL statements must be in the syntax specific to the data source, as Doris does not perform syntax and semantic checks. +- It is recommended that table names in SQL statements be fully qualified, i.e., in the `db.tbl` format. If the `db` is not specified, the db name specified in the JDBC Catalog's JDBC URL will be used. +- SQL statements cannot reference tables outside of the JDBC data source, nor can they reference Doris's tables. However, they can reference tables within the JDBC data source that have not been synchronized to the Doris JDBC Catalog. +- When executing DML statements, it is not possible to obtain the number of rows inserted, updated, or deleted; success of the command execution can only be confirmed. +- Only users with LOAD permissions on the Catalog can execute this command. + +## Supported Datasoures + ### MySQL #### Example @@ -602,74 +704,6 @@ CREATE CATALOG jdbc_oceanbase PROPERTIES ( When Doris connects to OceanBase, it will automatically recognize that OceanBase is in MySQL or Oracle mode. Hierarchical correspondence and type mapping refer to [MySQL](#MySQL) and [Oracle](#Oracle) ::: -### View the JDBC Catalog - -You can query all Catalogs in the current Doris cluster through SHOW CATALOGS: - -```sql -SHOW CATALOGS; -``` - -Query the creation statement of a Catalog through SHOW CREATE CATALOG: - -```sql -SHOW CREATE CATALOG ; -``` - -### Drop the JDBC Catalog - -A Catalog can be deleted via DROP CATALOG: - -```sql -DROP CATALOG ; -``` - -### Query the JDBC Catalog - -1. Use SWITCH to switch the Catalog in effect for the current session: - - ```sql - SWITCH ; - ``` - -2. Query all libraries under the current Catalog through SHOW DATABASES: - - ```sql - SHOW DATABASES FROM ; - ``` - - ```sql - SHOW DATABASES; - ``` - -3. Use USE to switch the Database that takes effect in the current session: - - ```sql - USE ; - ``` - - Or directly use `USE .;` to switch the Database that takes effect in the current session - -4. Query all tables under the current Catalog through SHOW TABLES: - - ```sql - SHOW TABLES FROM .; - ``` - - ```sql - SHOW TABLES FROM ; - ``` - - ```sql - SHOW TABLES; - ``` - -5. Query the data of a table under the current Catalog through SELECT: - - ```sql - SELECT * FROM ; - ``` - ## JDBC Drivers It is recommended to use the following versions of Driver to connect to the corresponding database. Other versions of the Driver have not been tested and may cause unexpected problems. diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index 48fb1f253f..2c2e084ab7 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -157,6 +157,108 @@ set enable_odbc_transcation = true; ## 使用指南 +### 查看 JDBC Catalog + +可以通过 SHOW CATALOGS 查询当前所在 Doris 集群里所有 Catalog: + +```sql +SHOW CATALOGS; +``` + +通过 SHOW CREATE CATALOG 查询某个 Catalog 的创建语句: + +```sql +SHOW CREATE CATALOG ; +``` + +### 删除 JDBC Catalog + +可以通过 DROP CATALOG 删除某个 Catalog: + +```sql +DROP CATALOG ; +``` + +### 查询 JDBC Catalog + +1. 通过 SWITCH 切换当前会话生效的 Catalog: + + ```sql + SWITCH ; + ``` + +2. 通过 SHOW DATABASES 查询当前 Catalog 下的所有库: + + ```sql + SHOW DATABASES FROM ; + ``` + + ```sql + SHOW DATABASES; + ``` + +3. 通过 USE 切换当前会话生效的 Database: + + ```sql + USE ; + ``` + + 或者直接通过 `USE .;` 切换当前会话生效的 Database + +4. 通过 SHOW TABLES 查询当前 Catalog 下的所有表: + + ```sql + SHOW TABLES FROM .; + ``` + + ```sql + SHOW TABLES FROM ; + ``` + + ```sql + SHOW TABLES; + ``` + +5. 通过 SELECT 查询当前 Catalog 下的某个表的数据: + + ```sql + SELECT * FROM ; + ``` + +### SQL 透传 + +在 Doris 2.0.3 之前的版本中,用户只能通过 JDBC Catalog 进行查询操作(SELECT)。 +在 Doris 2.0.4 版本之后,用户可以通过 `CALL` 命令,对 JDBC 数据源进行 DDL 和 DML 操作。 + +``` +CALL EXECUTE_STMT("catalog_name", "raw_stmt_string"); +``` + +`EXECUTE_STMT()` 过程有两个参数: + +- Catalog Name:目前仅支持 Jdbc Catalog。 +- 执行语句:目前仅支持 DDL 和 DML 语句。并且需要直接使用 JDBC 数据源对应的语法。 + +``` +CALL EXECUTE_STMT("jdbc_catalog", "insert into db1.tbl1 values(1,2), (3, 4)"); + +CALL EXECUTE_STMT(jdbc_catalog", "delete from db1.tbl1 where k1 = 2"); + +CALL EXECUTE_STMT(jdbc_catalog", "create table dbl1.tbl2 (k1 int)"); +``` + +#### 原理和限制 + +通过 `CALL EXECUTE_STMT()` 命令,Doris 会直接将用户编写的 SQL 语句发送给 Catalog 对应的 JDBC 数据源进行执行。因此,这个操作有如下限制: + +- SQL 语句必须是数据源对应的语法,Doris 不会做语法和语义检查。 +- SQL 语句中引用的表名建议是全限定名,即 `db.tbl` 这种格式。如果未指定 db,则会使用 JDBC Catalog 的 JDBC url 中指定的 db 名称。 +- SQL 语句中不可引用 JDBC 数据源之外的库表,也不可以引用 Doris 的库表。但可以引用在 JDBC 数据源内的,但是没有同步到 Doris JDBC Catalog 的库表。 +- 执行 DML 语句,无法获取插入、更新或删除的行数,只能获取命令是否执行成功。 +- 只有对 Catalog 有 LOAD 权限的用户,才能执行这个命令。 + +## 支持的数据源 + ### MySQL #### 创建示例 @@ -603,74 +705,6 @@ CREATE CATALOG jdbc_oceanbase PROPERTIES ( Doris 在连接 OceanBase 时,会自动识别 OceanBase 处于 MySQL 或者 Oracle 模式,层级对应和类型映射参考 [MySQL](#MySQL) 与 [Oracle](#Oracle) ::: -### 查看 JDBC Catalog - -可以通过 SHOW CATALOGS 查询当前所在 Doris 集群里所有 Catalog: - -```sql -SHOW CATALOGS; -``` - -通过 SHOW CREATE CATALOG 查询某个 Catalog 的创建语句: - -```sql -SHOW CREATE CATALOG ; -``` - -### 删除 JDBC Catalog - -可以通过 DROP CATALOG 删除某个 Catalog: - -```sql -DROP CATALOG ; -``` - -### 查询 JDBC Catalog - -1. 通过 SWITCH 切换当前会话生效的 Catalog: - - ```sql - SWITCH ; - ``` - -2. 通过 SHOW DATABASES 查询当前 Catalog 下的所有库: - - ```sql - SHOW DATABASES FROM ; - ``` - - ```sql - SHOW DATABASES; - ``` - -3. 通过 USE 切换当前会话生效的 Database: - - ```sql - USE ; - ``` - - 或者直接通过 `USE .;` 切换当前会话生效的 Database - -4. 通过 SHOW TABLES 查询当前 Catalog 下的所有表: - - ```sql - SHOW TABLES FROM .; - ``` - - ```sql - SHOW TABLES FROM ; - ``` - - ```sql - SHOW TABLES; - ``` - -5. 通过 SELECT 查询当前 Catalog 下的某个表的数据: - - ```sql - SELECT * FROM ; - ``` - ## JDBC Driver 列表 推荐使用以下版本的 Driver 连接对应的数据库。其他版本的 Driver 未经测试,可能导致非预期的问题。 diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 index f3e7e8c583..fe07122f03 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 @@ -136,6 +136,7 @@ BUILD: 'BUILD'; BUILTIN: 'BUILTIN'; BY: 'BY'; CACHED: 'CACHED'; +CALL: 'CALL'; CANCEL: 'CANCEL'; CASE: 'CASE'; CAST: 'CAST'; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 9b47e799e7..ed81a27742 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -101,6 +101,7 @@ statement constraint #addConstraint | ALTER TABLE table=relation DROP CONSTRAINT constraintName=errorCapturingIdentifier #dropConstraint + | CALL functionName=identifier LEFT_PAREN (expression (COMMA expression)*)? RIGHT_PAREN #callProcedure ; constraint @@ -117,6 +118,7 @@ partitionSpec // TODO: support analyze external table partition spec https://github.com/apache/doris/pull/24154 // | PARTITIONS LEFT_PAREN ASTERISK RIGHT_PAREN // | PARTITIONS WITH RECENT + LEFT_PAREN referenceSlots+=errorCapturingIdentifier (COMMA referenceSlots+=errorCapturingIdentifier)* RIGHT_PAREN ; dataDesc @@ -906,6 +908,7 @@ nonReserved | BUILD | BUILTIN | CACHED + | CALL | CATALOG | CATALOGS | CHAIN diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index df1d442b9d..8f2446cfc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -203,4 +203,13 @@ public class JdbcExternalCatalog extends ExternalCatalog { } } } + + /** + * Execute stmt direct via jdbc + * @param stmt, the raw stmt string + */ + public void executeStmt(String stmt) { + makeSureInitialized(); + jdbcClient.executeStmt(stmt); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index b4e2604bfc..b7f4fd4955 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -467,6 +467,24 @@ public abstract class JdbcClient { return databaseMetaData.getColumns(catalogName, schemaName, tableName, null); } + /** + * Execute stmt direct via jdbc + * @param origStmt, the raw stmt string + */ + public void executeStmt(String origStmt) { + Connection conn = getConnection(); + Statement stmt = null; + try { + stmt = conn.createStatement(); + int effectedRows = stmt.executeUpdate(origStmt); + LOG.debug("finished to execute dml stmt: {}, effected rows: {}", origStmt, effectedRows); + } catch (SQLException e) { + throw new JdbcClientException("Failed to execute stmt. error: " + e.getMessage(), e); + } finally { + close(stmt, conn); + } + } + @Data protected static class JdbcFieldSchema { protected String columnName; @@ -501,3 +519,4 @@ public abstract class JdbcClient { return ScalarType.createStringType(); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index dc60b90b8c..cf9d591127 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -52,6 +52,7 @@ import org.apache.doris.nereids.DorisParser.BooleanLiteralContext; import org.apache.doris.nereids.DorisParser.BracketJoinHintContext; import org.apache.doris.nereids.DorisParser.BracketRelationHintContext; import org.apache.doris.nereids.DorisParser.BuildModeContext; +import org.apache.doris.nereids.DorisParser.CallProcedureContext; import org.apache.doris.nereids.DorisParser.CollateContext; import org.apache.doris.nereids.DorisParser.ColumnDefContext; import org.apache.doris.nereids.DorisParser.ColumnDefsContext; @@ -318,6 +319,7 @@ import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.Constraint; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; @@ -2998,4 +3000,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor { return new TableSample(rows, false, seek); } + @Override + public Object visitCallProcedure(CallProcedureContext ctx) { + String functionName = ctx.functionName.getText(); + List arguments = ctx.expression().stream() + .map(this::typedVisit) + .collect(ImmutableList.toImmutableList()); + UnboundFunction unboundFunction = new UnboundFunction(functionName, arguments); + return new CallCommand(unboundFunction); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index f244bf32b2..d75115a648 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -132,5 +132,6 @@ public enum PlanType { ADD_CONSTRAINT_COMMAND, DROP_CONSTRAINT_COMMAND, REFRESH_MTMV_COMMAND, - DROP_MTMV_COMMAND + DROP_MTMV_COMMAND, + CALL_COMMAND } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java new file mode 100644 index 0000000000..0042e65be6 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CallCommand.java @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.nereids.analyzer.UnboundFunction; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.commands.call.CallFunc; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Objects; + +/** + * call func() + */ +public class CallCommand extends Command implements ForwardWithSync { + public static final Logger LOG = LogManager.getLogger(CallCommand.class); + + private final UnboundFunction unboundFunction; + + /** + * constructor + */ + public CallCommand(UnboundFunction unboundFunction) { + super(PlanType.CALL_COMMAND); + this.unboundFunction = Objects.requireNonNull(unboundFunction, "function is null"); + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + CallFunc analyzedFunc = CallFunc.getFunc(ctx.getCurrentUserIdentity(), unboundFunction); + analyzedFunc.run(); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitCallCommand(this, context); + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallExecuteStmtFunc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallExecuteStmtFunc.java new file mode 100644 index 0000000000..1e36915c11 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallExecuteStmtFunc.java @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.call; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.catalog.Env; +import org.apache.doris.datasource.CatalogIf; +import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; +import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.literal.Literal; + +import java.util.List; +import java.util.Objects; + +/** + * EXECUTE_STMT("catalog_name", "stmt") + */ +public class CallExecuteStmtFunc extends CallFunc { + + private final UserIdentity user; + private final String catalogName; + private final String stmt; + + private CallExecuteStmtFunc(UserIdentity user, String catalogName, String stmt) { + this.user = Objects.requireNonNull(user, "user is missing"); + this.catalogName = Objects.requireNonNull(catalogName, "catalogName is missing"); + this.stmt = Objects.requireNonNull(stmt, "stmt is missing"); + } + + /** + * Create a CallFunc + */ + public static CallFunc create(UserIdentity user, List args) { + if (args.size() != 2) { + throw new AnalysisException("EXECUTE_STMT function must have 2 arguments: 'catalog name' and 'stmt'"); + } + + String catalogName = null; + String stmt = null; + for (int i = 0; i < args.size(); i++) { + if (!(args.get(i).isConstant())) { + throw new AnalysisException("Argument of EXECUTE_STMT function must be constant string"); + } + if (!(args.get(i) instanceof Literal)) { + throw new AnalysisException("Argument of EXECUTE_STMT function must be constant string"); + } + Literal literal = (Literal) args.get(i); + if (!literal.isStringLikeLiteral()) { + throw new AnalysisException("Argument of EXECUTE_STMT function must be constant string"); + } + + String rawString = literal.getStringValue(); + switch (i) { + case 0: + catalogName = rawString; + break; + case 1: + stmt = rawString; + break; + default: + throw new AnalysisException("EXECUTE_STMT function must have 2 arguments"); + } + } + return new CallExecuteStmtFunc(user, catalogName, stmt); + } + + @Override + public void run() { + CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName); + if (catalogIf == null) { + throw new AnalysisException("catalog not found: " + catalogName); + } + if (!(catalogIf instanceof JdbcExternalCatalog)) { + throw new AnalysisException("Only support JDBC catalog"); + } + + // check priv + if (!Env.getCurrentEnv().getAuth().checkCtlPriv(user, catalogName, PrivPredicate.LOAD)) { + throw new AnalysisException("user " + user + " has no privilege to execute stmt in catalog " + catalogName); + } + + JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf; + catalog.executeStmt(stmt); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java new file mode 100644 index 0000000000..a9ba819951 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/call/CallFunc.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands.call; + +import org.apache.doris.analysis.UserIdentity; +import org.apache.doris.nereids.analyzer.UnboundFunction; + +/** + * call function + */ +public abstract class CallFunc { + + /** + * Get the instance of CallFunc + */ + public static CallFunc getFunc(UserIdentity user, UnboundFunction unboundFunction) { + String funcName = unboundFunction.getName().toUpperCase(); + switch (funcName) { + case "EXECUTE_STMT": + return CallExecuteStmtFunc.create(user, unboundFunction.getArguments()); + default: + throw new IllegalArgumentException("unknown function name: " + funcName); + } + } + + public abstract void run(); +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index f4687e1d25..c67606b0ea 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.visitor; import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand; import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand; +import org.apache.doris.nereids.trees.plans.commands.CallCommand; import org.apache.doris.nereids.trees.plans.commands.Command; import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand; import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand; @@ -106,4 +107,8 @@ public interface CommandVisitor { default R visitDropMTMVCommand(DropMTMVCommand dropMTMVCommand, C context) { return visitCommand(dropMTMVCommand, context); } + + default R visitCallCommand(CallCommand callCommand, C context) { + return visitCommand(callCommand, context); + } } diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_call.out b/regression-test/data/external_table_p0/jdbc/test_jdbc_call.out new file mode 100644 index 0000000000..43c1b726a4 --- /dev/null +++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_call.out @@ -0,0 +1,30 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1 2 + +-- !sql2 -- +1 2 +3 4 + +-- !sql3 -- +1 2 +3 4 + +-- !sql4 -- +3 4 + +-- !sql5 -- +3 4 + +-- !sql6 -- +5 6 +7 8 + +-- !sql7 -- +5 6 +7 8 + +-- !sql8 -- +3 4 +9 10 + diff --git a/regression-test/suites/external_table_p0/jdbc/test_jdbc_call.groovy b/regression-test/suites/external_table_p0/jdbc/test_jdbc_call.groovy new file mode 100644 index 0000000000..43bcdadfba --- /dev/null +++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_call.groovy @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_jdbc_call", "p0,external,doris,external_docker,external_docker_doris") { + String jdbcUrl = context.config.jdbcUrl + "&sessionVariables=return_object_data_as_binary=true" + String jdbcUser = context.config.jdbcUser + String jdbcPassword = context.config.jdbcPassword + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-java-8.0.25.jar" + + String catalog_name = "jdbc_call"; + String non_jdbc_catalog_name = "non_jdbc_catalog"; + String internal_db_name = "jdbc_call_db"; + String internal_tbl_name = "jdbc_call_tbl"; + String internal_tbl_name2 = "jdbc_call_tbl2"; + + sql """set enable_nereids_planner=true;""" + sql """set enable_fallback_to_original_planner=false;""" + + // 1. create internal db/tbl + sql """drop database if exists ${internal_db_name};""" + sql """create database if not exists ${internal_db_name}; """ + + sql """create table ${internal_db_name}.${internal_tbl_name} (k1 int, k2 int) distributed by hash(k1) buckets 1 properties("replication_num" = "1");""" + sql """insert into ${internal_db_name}.${internal_tbl_name} values(1, 2);""" + + // 2. create catalog + sql """drop catalog if exists ${catalog_name} """ + sql """ CREATE CATALOG `${catalog_name}` PROPERTIES ( + "user" = "${jdbcUser}", + "type" = "jdbc", + "password" = "${jdbcPassword}", + "jdbc_url" = "${jdbcUrl}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver" + )""" + + sql """drop catalog if exists ${non_jdbc_catalog_name}""" + sql """create catalog if not exists ${non_jdbc_catalog_name} properties ( + "type"="hms", + 'hive.metastore.uris' = 'thrift://127.0.0.1:9083' + );""" + + + // 3. execute call + order_qt_sql1 """select * from ${catalog_name}.${internal_db_name}.${internal_tbl_name}"""; + + test { + sql """call execute_stmt()""" + exception "EXECUTE_STMT function must have 2 arguments" + } + + test { + sql """call execute_stmt("jdbc")""" + exception "EXECUTE_STMT function must have 2 arguments" + } + + test { + sql """call execute_stmt("${non_jdbc_catalog_name}", "select 1")""" + exception "Only support JDBC catalog" + } + + test { + sql """call execute_stmt("xxx", "select 1")""" + exception "catalog not found: xxx" + } + + test { + sql """call execute_stmt("${catalog_name}", "select 1")""" + exception "Can not issue SELECT via executeUpdate() or executeLargeUpdate()" + } + + // execute insert + sql """call execute_stmt("${catalog_name}", "insert into ${internal_db_name}.${internal_tbl_name} values (3, 4)")""" + order_qt_sql2 """select * from ${catalog_name}.${internal_db_name}.${internal_tbl_name}""" + order_qt_sql3 """select * from internal.${internal_db_name}.${internal_tbl_name}""" + + // execute delete + sql """call execute_stmt("${catalog_name}", "delete from ${internal_db_name}.${internal_tbl_name} where k1 = 1")""" + order_qt_sql4 """select * from ${catalog_name}.${internal_db_name}.${internal_tbl_name}""" + order_qt_sql5 """select * from internal.${internal_db_name}.${internal_tbl_name}""" + + // execute create table and insert + sql """call execute_stmt("${catalog_name}", "create table ${internal_db_name}.${internal_tbl_name2} (c1 int, c2 int) distributed by hash(c1) buckets 1 properties('replication_num' = '1')")""" + sql """refresh catalog ${catalog_name}""" + sql """call execute_stmt("${catalog_name}", "insert into ${internal_db_name}.${internal_tbl_name2} values (5, 6), (7, 8)")""" + order_qt_sql6 """select * from ${catalog_name}.${internal_db_name}.${internal_tbl_name2}""" + order_qt_sql7 """select * from internal.${internal_db_name}.${internal_tbl_name2}""" + + // test priv + // only user with load priv can execute call + String user1 = "normal_jdbc_user"; + String user2 = "load_jdbc_user"; + sql """drop user if exists ${user1}"""; + sql """create user ${user1}"""; + sql """grant select_priv on *.*.* to ${user1}""" + sql """drop user if exists ${user2}"""; + sql """create user ${user2}"""; + sql """grant load_priv, select_priv on *.*.* to ${user2}""" + + def result1 = connect(user="${user1}", password="", url=context.config.jdbcUrl) { + sql """set enable_nereids_planner=true;""" + sql """set enable_fallback_to_original_planner=false;""" + test { + sql """call execute_stmt("${catalog_name}", "insert into ${internal_db_name}.${internal_tbl_name} values (3, 4)")""" + exception """has no privilege to execute stmt in catalog""" + } + } + + def result2 = connect(user="${user2}", password="", url=context.config.jdbcUrl) { + sql """set enable_nereids_planner=true;""" + sql """set enable_fallback_to_original_planner=false;""" + sql """call execute_stmt("${catalog_name}", "insert into ${internal_db_name}.${internal_tbl_name} values (9, 10)")""" + order_qt_sql8 """select * from internal.${internal_db_name}.${internal_tbl_name}""" + } +}