[feature](jdbc) support call function to pass sql directly to jdbc catalog #26492

Support a new stmt in Nereids:
`CALL EXECUTE_STMT("jdbc", "stmt")`

So that we can pass the origin stmt directly to the datasource of a jdbc catalog.

show case:
```
mysql> select * from mysql_catalog.db1.tbl1;
+------+------+
| k1   | k2   |
+------+------+
|  111 | 222  |
+------+------+
1 row in set (0.63 sec)

mysql> call execute("mysql_catalog", "insert into db1.tbl1 values(1,'abc')");
Query OK, 0 rows affected (0.01 sec)

mysql> select * from mysql_catalog.db1.tbl1;
+------+------+
| k1   | k2   |
+------+------+
|  111 | 222  |
|    1 | abc  |
+------+------+
2 rows in set (0.03 sec)

mysql> call execute_stmt("mysql_catalog", "delete from db1.tbl1 where k1=111");
Query OK, 0 rows affected (0.01 sec)

mysql> select * from mysql_catalog.db1.tbl1;
+------+------+
| k1   | k2   |
+------+------+
|    1 | abc  |
+------+------+
1 row in set (0.03 sec)
```
This commit is contained in:
Mingyu Chen
2023-12-08 23:06:05 +08:00
committed by GitHub
parent 2b914aebb6
commit baf85547ae
14 changed files with 618 additions and 137 deletions

View File

@ -203,4 +203,13 @@ public class JdbcExternalCatalog extends ExternalCatalog {
}
}
}
/**
* Execute stmt direct via jdbc
* @param stmt, the raw stmt string
*/
public void executeStmt(String stmt) {
makeSureInitialized();
jdbcClient.executeStmt(stmt);
}
}

View File

@ -467,6 +467,24 @@ public abstract class JdbcClient {
return databaseMetaData.getColumns(catalogName, schemaName, tableName, null);
}
/**
* Execute stmt direct via jdbc
* @param origStmt, the raw stmt string
*/
public void executeStmt(String origStmt) {
Connection conn = getConnection();
Statement stmt = null;
try {
stmt = conn.createStatement();
int effectedRows = stmt.executeUpdate(origStmt);
LOG.debug("finished to execute dml stmt: {}, effected rows: {}", origStmt, effectedRows);
} catch (SQLException e) {
throw new JdbcClientException("Failed to execute stmt. error: " + e.getMessage(), e);
} finally {
close(stmt, conn);
}
}
@Data
protected static class JdbcFieldSchema {
protected String columnName;
@ -501,3 +519,4 @@ public abstract class JdbcClient {
return ScalarType.createStringType();
}
}

View File

@ -52,6 +52,7 @@ import org.apache.doris.nereids.DorisParser.BooleanLiteralContext;
import org.apache.doris.nereids.DorisParser.BracketJoinHintContext;
import org.apache.doris.nereids.DorisParser.BracketRelationHintContext;
import org.apache.doris.nereids.DorisParser.BuildModeContext;
import org.apache.doris.nereids.DorisParser.CallProcedureContext;
import org.apache.doris.nereids.DorisParser.CollateContext;
import org.apache.doris.nereids.DorisParser.ColumnDefContext;
import org.apache.doris.nereids.DorisParser.ColumnDefsContext;
@ -318,6 +319,7 @@ import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CallCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.Constraint;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
@ -2998,4 +3000,13 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> {
return new TableSample(rows, false, seek);
}
@Override
public Object visitCallProcedure(CallProcedureContext ctx) {
String functionName = ctx.functionName.getText();
List<Expression> arguments = ctx.expression().stream()
.<Expression>map(this::typedVisit)
.collect(ImmutableList.toImmutableList());
UnboundFunction unboundFunction = new UnboundFunction(functionName, arguments);
return new CallCommand(unboundFunction);
}
}

View File

@ -132,5 +132,6 @@ public enum PlanType {
ADD_CONSTRAINT_COMMAND,
DROP_CONSTRAINT_COMMAND,
REFRESH_MTMV_COMMAND,
DROP_MTMV_COMMAND
DROP_MTMV_COMMAND,
CALL_COMMAND
}

View File

@ -0,0 +1,59 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands;
import org.apache.doris.nereids.analyzer.UnboundFunction;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.call.CallFunc;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Objects;
/**
* call func()
*/
public class CallCommand extends Command implements ForwardWithSync {
public static final Logger LOG = LogManager.getLogger(CallCommand.class);
private final UnboundFunction unboundFunction;
/**
* constructor
*/
public CallCommand(UnboundFunction unboundFunction) {
super(PlanType.CALL_COMMAND);
this.unboundFunction = Objects.requireNonNull(unboundFunction, "function is null");
}
@Override
public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
CallFunc analyzedFunc = CallFunc.getFunc(ctx.getCurrentUserIdentity(), unboundFunction);
analyzedFunc.run();
}
@Override
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitCallCommand(this, context);
}
}

View File

@ -0,0 +1,102 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.call;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.literal.Literal;
import java.util.List;
import java.util.Objects;
/**
* EXECUTE_STMT("catalog_name", "stmt")
*/
public class CallExecuteStmtFunc extends CallFunc {
private final UserIdentity user;
private final String catalogName;
private final String stmt;
private CallExecuteStmtFunc(UserIdentity user, String catalogName, String stmt) {
this.user = Objects.requireNonNull(user, "user is missing");
this.catalogName = Objects.requireNonNull(catalogName, "catalogName is missing");
this.stmt = Objects.requireNonNull(stmt, "stmt is missing");
}
/**
* Create a CallFunc
*/
public static CallFunc create(UserIdentity user, List<Expression> args) {
if (args.size() != 2) {
throw new AnalysisException("EXECUTE_STMT function must have 2 arguments: 'catalog name' and 'stmt'");
}
String catalogName = null;
String stmt = null;
for (int i = 0; i < args.size(); i++) {
if (!(args.get(i).isConstant())) {
throw new AnalysisException("Argument of EXECUTE_STMT function must be constant string");
}
if (!(args.get(i) instanceof Literal)) {
throw new AnalysisException("Argument of EXECUTE_STMT function must be constant string");
}
Literal literal = (Literal) args.get(i);
if (!literal.isStringLikeLiteral()) {
throw new AnalysisException("Argument of EXECUTE_STMT function must be constant string");
}
String rawString = literal.getStringValue();
switch (i) {
case 0:
catalogName = rawString;
break;
case 1:
stmt = rawString;
break;
default:
throw new AnalysisException("EXECUTE_STMT function must have 2 arguments");
}
}
return new CallExecuteStmtFunc(user, catalogName, stmt);
}
@Override
public void run() {
CatalogIf catalogIf = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (catalogIf == null) {
throw new AnalysisException("catalog not found: " + catalogName);
}
if (!(catalogIf instanceof JdbcExternalCatalog)) {
throw new AnalysisException("Only support JDBC catalog");
}
// check priv
if (!Env.getCurrentEnv().getAuth().checkCtlPriv(user, catalogName, PrivPredicate.LOAD)) {
throw new AnalysisException("user " + user + " has no privilege to execute stmt in catalog " + catalogName);
}
JdbcExternalCatalog catalog = (JdbcExternalCatalog) catalogIf;
catalog.executeStmt(stmt);
}
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.trees.plans.commands.call;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.nereids.analyzer.UnboundFunction;
/**
* call function
*/
public abstract class CallFunc {
/**
* Get the instance of CallFunc
*/
public static CallFunc getFunc(UserIdentity user, UnboundFunction unboundFunction) {
String funcName = unboundFunction.getName().toUpperCase();
switch (funcName) {
case "EXECUTE_STMT":
return CallExecuteStmtFunc.create(user, unboundFunction.getArguments());
default:
throw new IllegalArgumentException("unknown function name: " + funcName);
}
}
public abstract void run();
}

View File

@ -20,6 +20,7 @@ package org.apache.doris.nereids.trees.plans.visitor;
import org.apache.doris.nereids.trees.plans.commands.AddConstraintCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.BatchInsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CallCommand;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.CreateMTMVCommand;
import org.apache.doris.nereids.trees.plans.commands.CreatePolicyCommand;
@ -106,4 +107,8 @@ public interface CommandVisitor<R, C> {
default R visitDropMTMVCommand(DropMTMVCommand dropMTMVCommand, C context) {
return visitCommand(dropMTMVCommand, context);
}
default R visitCallCommand(CallCommand callCommand, C context) {
return visitCommand(callCommand, context);
}
}