diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index d544227f02..d5427fee49 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -202,7 +202,9 @@ Status JdbcConnector::query() { } LOG(INFO) << "JdbcConnector::query has exec success: " << _sql_str; - RETURN_IF_ERROR(_check_column_type()); + if (_conn_param.table_type != TOdbcTableType::NEBULA) { + RETURN_IF_ERROR(_check_column_type()); + } return Status::OK(); } diff --git a/docs/en/docs/lakehouse/external-table/jdbc.md b/docs/en/docs/lakehouse/external-table/jdbc.md index 66ebcfdbbf..2b2844f2bc 100644 --- a/docs/en/docs/lakehouse/external-table/jdbc.md +++ b/docs/en/docs/lakehouse/external-table/jdbc.md @@ -265,6 +265,98 @@ PROPERTIES ( ); ``` +### 9.NebulaGraphTest (only supports queries) +| Nebula version | Nebula JDBC Driver Version | +|------------|-------------------| +| 3.0.0 | nebula-jdbc-3.0.0-jar-with-dependencies.jar | + + +``` +#step1.crate test data in nebula +#1.1 create tag +(root@nebula) [basketballplayer]> CREATE TAG test(t_str string, + t_int int, + t_date date, + t_datetime datetime, + t_bool bool, + t_timestamp timestamp, + t_float float, + t_double double +); +#1.2 insert test data +(root@nebula) [basketballplayer]> INSERT VERTEX test_type(t_str,t_int,t_date,t_datetime,t_bool,t_timestamp,t_float,t_double) values "zhangshan":("zhangshan",1000,date("2023-01-01"),datetime("2023-01-23 15:23:32"),true,1234242423,1.2,1.35); +#1.3 check the data +(root@nebula) [basketballplayer]> match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v limit 30; ++-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| v.test_type.t_str | v.test_type.t_int | v.test_type.t_date | v.test_type.t_datetime | v.test_type.t_bool | v.test_type.t_timestamp | v.test_type.t_float | v.test_type.t_double | v | ++-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| "zhangshan" | 1000 | 2023-01-01 | 2023-01-23T15:23:32.000000 | true | 1234242423 | 1.2000000476837158 | 1.35 | ("zhangshan" :test_type{t_bool: true, t_date: 2023-01-01, t_datetime: 2023-01-23T15:23:32.000000, t_double: 1.35, t_float: 1.2000000476837158, t_int: 1000, t_str: "zhangshan", t_timestamp: 1234242423}) | ++-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +Got 1 rows (time spent 1616/2048 us) +Mon, 17 Apr 2023 17:23:14 CST +#step2. create table in doris +#2.1 create a resource +MySQL [test_db]> CREATE EXTERNAL RESOURCE gg_jdbc_resource +properties ( + "type"="jdbc", + "user"="root", + "password"="123", + "jdbc_url"="jdbc:nebula://127.0.0.1:9669/basketballplayer", + "driver_url"="file:///home/clz/baidu/bdg/doris/be/lib/nebula-jdbc-3.0.0-jar-with-dependencies.jar", --Need to be placed in the be/lib directory-- + "driver_class"="com.vesoft.nebula.jdbc.NebulaDriver" +); +#2.2 Create a facade that mainly tells Doris how to parse the data returned by Nebulagraph +MySQL [test_db]> CREATE TABLE `test_type` ( + `t_str` varchar(64), + `t_int` bigint, + `t_date` date, + `t_datetime` datetime, + `t_bool` boolean, + `t_timestamp` bigint, + `t_float` double, + `t_double` double, + `t_vertx` varchar(128) --vertex map type varchar in doris--- +) ENGINE=JDBC +PROPERTIES ( +"resource" = "gg_jdbc_resource", +"table" = "xx", --please fill in any value here, we do not use it -- +"table_type"="nebula" +); +#2.3 Query the graph surface and use the g() function to transparently pass the nGQL of the graph to Nebula +MySQL [test_db]> select * from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v')\G; +*************************** 1. row *************************** + t_str: zhangshan + t_int: 1000 + t_date: 2023-01-01 + t_datetime: 2023-01-23 15:23:32 + t_bool: 1 +t_timestamp: 1234242423 + t_float: 1.2000000476837158 + t_double: 1.35 + t_vertx: ("zhangshan" :test_type {t_datetime: utc datetime: 2023-01-23T15:23:32.000000, timezoneOffset: 0, t_timestamp: 1234242423, t_date: 2023-01-01, t_double: 1.35, t_str: "zhangshan", t_int: 1000, t_bool: true, t_float: 1.2000000476837158}) +1 row in set (0.024 sec) +#2.3 Associate queries with other tables in Doris +#Assuming there is a user table +MySQL [test_db]> select * from t_user; ++-----------+------+---------------------------------+ +| username | age | addr | ++-----------+------+---------------------------------+ +| zhangshan | 26 | 北京市西二旗街道1008号 | ++-----------+------+---------------------------------+ +| lisi | 29 | 北京市西二旗街道1007号 | ++-----------+------+---------------------------------+ +1 row in set (0.013 sec) +#Associate with this table to query user related information +MySQL [test_db]> select u.* from (select t_str username from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str limit 1')) g left join t_user u on g.username=u.username; ++-----------+------+---------------------------------+ +| username | age | addr | ++-----------+------+---------------------------------+ +| zhangshan | 26 | 北京市西二旗街道1008号 | ++-----------+------+---------------------------------+ +1 row in set (0.029 sec) +``` + + > **Note:** > > When creating an OceanBase external table, you only need to specify the `oceanbase mode` parameter when creating a resource, and the table type of the table to be created is oceanbase @@ -411,6 +503,17 @@ The followings list how data types in different databases are mapped in Doris. For MySQL mode, please refer to [MySQL type mapping](#MySQL) For Oracle mode, please refer to [Oracle type mapping](#Oracle) +### Nebula-graph +| nebula | Doris | +|:------------:|:-------------------:| +| tinyint/samllint/int/int64 | bigint | +| double/float | double | +| date | date | +| timestamp | bigint | +| datetime | datetime | +| bool | boolean | +| vertex/edge/path/list/set/time etc | varchar | + ## Q&A See the FAQ section in [JDBC](https://doris.apache.org/docs/dev/lakehouse/multi-catalog/jdbc/). diff --git a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md index a43858be99..87894589c2 100644 --- a/docs/zh-CN/docs/lakehouse/external-table/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/external-table/jdbc.md @@ -258,6 +258,97 @@ PROPERTIES ( "table_type"="oceanbase" ); ``` + +### 9.Nebula-graph测试 (仅支持查询) +| nebula版本 | JDBC驱动版本 | +|------------|-------------------| +| 3.0.0 | nebula-jdbc-3.0.0-jar-with-dependencies.jar | +``` +#step1.在nebula创建测试数据 +#1.1 创建结点 +(root@nebula) [basketballplayer]> CREATE TAG test(t_str string, + t_int int, + t_date date, + t_datetime datetime, + t_bool bool, + t_timestamp timestamp, + t_float float, + t_double double +); +#1.2 插入数据 +(root@nebula) [basketballplayer]> INSERT VERTEX test_type(t_str,t_int,t_date,t_datetime,t_bool,t_timestamp,t_float,t_double) values "zhangshan":("zhangshan",1000,date("2023-01-01"),datetime("2023-01-23 15:23:32"),true,1234242423,1.2,1.35); +#1.3 查询数据 +(root@nebula) [basketballplayer]> match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v limit 30; ++-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| v.test_type.t_str | v.test_type.t_int | v.test_type.t_date | v.test_type.t_datetime | v.test_type.t_bool | v.test_type.t_timestamp | v.test_type.t_float | v.test_type.t_double | v | ++-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| "zhangshan" | 1000 | 2023-01-01 | 2023-01-23T15:23:32.000000 | true | 1234242423 | 1.2000000476837158 | 1.35 | ("zhangshan" :test_type{t_bool: true, t_date: 2023-01-01, t_datetime: 2023-01-23T15:23:32.000000, t_double: 1.35, t_float: 1.2000000476837158, t_int: 1000, t_str: "zhangshan", t_timestamp: 1234242423}) | ++-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +Got 1 rows (time spent 1616/2048 us) +Mon, 17 Apr 2023 17:23:14 CST +#step2.在doris中创建外表 +#2.1 创建一个resource +MySQL [test_db]> CREATE EXTERNAL RESOURCE gg_jdbc_resource +properties ( + "type"="jdbc", + "user"="root", + "password"="123", + "jdbc_url"="jdbc:nebula://127.0.0.1:9669/basketballplayer", + "driver_url"="file:///home/clz/baidu/bdg/doris/be/lib/nebula-jdbc-3.0.0-jar-with-dependencies.jar", --仅支持本地路径,需放到be/lib目录下-- + "driver_class"="com.vesoft.nebula.jdbc.NebulaDriver" +); +#2.2 创建一个外表,这个主要是告诉doris如何解析nebulagraph返回的数据 +MySQL [test_db]> CREATE TABLE `test_type` ( + `t_str` varchar(64), + `t_int` bigint, + `t_date` date, + `t_datetime` datetime, + `t_bool` boolean, + `t_timestamp` bigint, + `t_float` double, + `t_double` double, + `t_vertx` varchar(128) --vertex对应doris类型是varchar--- +) ENGINE=JDBC +PROPERTIES ( +"resource" = "gg_jdbc_resource", +"table" = "xx", --因为graph没有表的概念,这里随便填一个值-- +"table_type"="nebula" +); +#2.3 查询graph外表,用g()函数把图的nGQL透传给nebula +MySQL [test_db]> select * from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v')\G; +*************************** 1. row *************************** + t_str: zhangshan + t_int: 1000 + t_date: 2023-01-01 + t_datetime: 2023-01-23 15:23:32 + t_bool: 1 +t_timestamp: 1234242423 + t_float: 1.2000000476837158 + t_double: 1.35 + t_vertx: ("zhangshan" :test_type {t_datetime: utc datetime: 2023-01-23T15:23:32.000000, timezoneOffset: 0, t_timestamp: 1234242423, t_date: 2023-01-01, t_double: 1.35, t_str: "zhangshan", t_int: 1000, t_bool: true, t_float: 1.2000000476837158}) +1 row in set (0.024 sec) +#2.3 与doris的其他表进行关联查询 +#假设有张用户表 +MySQL [test_db]> select * from t_user; ++-----------+------+---------------------------------+ +| username | age | addr | ++-----------+------+---------------------------------+ +| zhangshan | 26 | 北京市西二旗街道1008号 | ++-----------+------+---------------------------------+ +| lisi | 29 | 北京市西二旗街道1007号 | ++-----------+------+---------------------------------+ +1 row in set (0.013 sec) +#与这张用表关联查询用户相关的信息 +MySQL [test_db]> select u.* from (select t_str username from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str limit 1')) g left join t_user u on g.username=u.username; ++-----------+------+---------------------------------+ +| username | age | addr | ++-----------+------+---------------------------------+ +| zhangshan | 26 | 北京市西二旗街道1008号 | ++-----------+------+---------------------------------+ +1 row in set (0.029 sec) +``` + + > **注意:** > > 在创建OceanBase外表时,只需在创建Resource时指定`oceanbase_mode`参数,创建外表的table_type为oceanbase。 @@ -403,6 +494,17 @@ PROPERTIES ( MySQL 模式请参考 [MySQL类型映射](#MySQL) Oracle 模式请参考 [Oracle类型映射](#Oracle) +### Nebula-graph +| nebula | Doris | +|:------------:|:-------------------:| +| tinyint/samllint/int/int64 | bigint | +| double/float | double | +| date | date | +| timestamp | bigint | +| datetime | datetime | +| bool | boolean | +| vertex/edge/path/list/set/time等 | varchar | + ## Q&A 请参考 [JDBC Catalog](../multi-catalog/jdbc.md) 中的 常见问题一节。 diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index 8ca7440988..0c4daf9cc8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -61,6 +61,7 @@ import java.util.Map; public class JdbcResource extends Resource { private static final Logger LOG = LogManager.getLogger(JdbcResource.class); + public static final String JDBC_NEBULA = "jdbc:nebula"; public static final String JDBC_MYSQL = "jdbc:mysql"; public static final String JDBC_MARIADB = "jdbc:mariadb"; public static final String JDBC_POSTGRESQL = "jdbc:postgresql"; @@ -72,6 +73,7 @@ public class JdbcResource extends Resource { public static final String JDBC_PRESTO = "jdbc:presto"; public static final String JDBC_OCEANBASE = "jdbc:oceanbase"; + public static final String NEBULA = "NEBULA"; public static final String MYSQL = "MYSQL"; public static final String POSTGRESQL = "POSTGRESQL"; public static final String ORACLE = "ORACLE"; @@ -296,6 +298,8 @@ public class JdbcResource extends Resource { } else { throw new DdlException("Invalid OceanBase mode: " + oceanbaseMode + ". Must be 'mysql' or 'oracle'"); } + } else if (url.startsWith(JDBC_NEBULA)) { + return NEBULA; } throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index 04dca97e5d..8699b53315 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -70,6 +70,7 @@ public class JdbcTable extends Table { static { Map tempMap = new CaseInsensitiveMap(); + tempMap.put("nebula", TOdbcTableType.NEBULA); tempMap.put("mysql", TOdbcTableType.MYSQL); tempMap.put("postgresql", TOdbcTableType.POSTGRESQL); tempMap.put("sqlserver", TOdbcTableType.SQLSERVER); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java index d51b228141..4685046079 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JdbcScanNode.java @@ -20,6 +20,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.ExprSubstitutionMap; +import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; @@ -53,6 +54,7 @@ public class JdbcScanNode extends ScanNode { private final List filters = new ArrayList(); private String tableName; private TOdbcTableType jdbcType; + private String graphQueryString = ""; public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean isJdbcExternalTable) { super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE); @@ -71,6 +73,26 @@ public class JdbcScanNode extends ScanNode { public void init(Analyzer analyzer) throws UserException { super.init(analyzer); computeStats(analyzer); + getGraphQueryString(); + } + + private boolean isNebula() { + return jdbcType == TOdbcTableType.NEBULA; + } + + private void getGraphQueryString() { + if (!isNebula()) { + return; + } + for (Expr expr : conjuncts) { + FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr; + if ("g".equals(functionCallExpr.getFnName().getFunction())) { + graphQueryString = functionCallExpr.getChild(0).getStringValue(); + break; + } + } + //clean conjusts cause graph sannnode no need conjuncts + conjuncts = Lists.newArrayList(); } /** @@ -130,6 +152,9 @@ public class JdbcScanNode extends ScanNode { } private String getJdbcQueryStr() { + if (isNebula()) { + return graphQueryString; + } StringBuilder sql = new StringBuilder("SELECT "); // Oracle use the where clause to do top n diff --git a/fe/java-udf/pom.xml b/fe/java-udf/pom.xml index 17eedebf52..b4235959f0 100644 --- a/fe/java-udf/pom.xml +++ b/fe/java-udf/pom.xml @@ -35,6 +35,11 @@ under the License. 1 + + com.vesoft + client + 3.0.0 + org.apache.doris fe-common diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index 0aa34d0a1a..a0f544f17d 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -29,6 +29,7 @@ import com.clickhouse.data.value.UnsignedInteger; import com.clickhouse.data.value.UnsignedLong; import com.clickhouse.data.value.UnsignedShort; import com.google.common.base.Preconditions; +import com.vesoft.nebula.client.graph.data.ValueWrapper; import org.apache.log4j.Logger; import org.apache.thrift.TDeserializer; import org.apache.thrift.TException; @@ -45,6 +46,7 @@ import java.net.MalformedURLException; import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.Date; +import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -77,6 +79,7 @@ public class JdbcExecutor { private int maxPoolSize; private int minIdleSize; private int maxIdelTime; + private TOdbcTableType tableType; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -86,6 +89,7 @@ public class JdbcExecutor { } catch (TException e) { throw new InternalException(e.getMessage()); } + tableType = request.table_type; minPoolSize = Integer.valueOf(System.getProperty("JDBC_MIN_POOL", "1")); maxPoolSize = Integer.valueOf(System.getProperty("JDBC_MAX_POOL", "100")); maxIdelTime = Integer.valueOf(System.getProperty("JDBC_MAX_IDEL_TIME", "300000")); @@ -98,6 +102,10 @@ public class JdbcExecutor { request.jdbc_url, request.jdbc_user, request.jdbc_password, request.op, request.table_type); } + public boolean isNebula() { + return tableType == TOdbcTableType.NEBULA; + } + public void close() throws Exception { if (resultSet != null) { resultSet.close(); @@ -127,7 +135,9 @@ public class JdbcExecutor { resultColumnTypeNames = new ArrayList<>(columnCount); block = new ArrayList<>(columnCount); for (int i = 0; i < columnCount; ++i) { - resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); + if (!isNebula()) { + resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1)); + } block.add((Object[]) Array.newInstance(Object.class, batchSizeNum)); } return columnCount; @@ -200,7 +210,11 @@ public class JdbcExecutor { curBlockRows = 0; do { for (int i = 0; i < columnCount; ++i) { - block.get(i)[curBlockRows] = resultSet.getObject(i + 1); + if (isNebula()) { + block.get(i)[curBlockRows] = UdfUtils.convertObject((ValueWrapper) resultSet.getObject(i + 1)); + } else { + block.get(i)[curBlockRows] = resultSet.getObject(i + 1); + } } curBlockRows++; } while (curBlockRows < batchSize && resultSet.next()); @@ -254,45 +268,52 @@ public class JdbcExecutor { private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType) throws UdfRuntimeException { try { - ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); - druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl + jdbcUser + jdbcPassword); - if (druidDataSource == null) { - DruidDataSource ds = new DruidDataSource(); - ds.setDriverClassLoader(classLoader); - ds.setDriverClassName(driverClass); - ds.setUrl(jdbcUrl); - ds.setUsername(jdbcUser); - ds.setPassword(jdbcPassword); - ds.setMinIdle(minIdleSize); - ds.setInitialSize(minPoolSize); - ds.setMaxActive(maxPoolSize); - ds.setMaxWait(5000); - ds.setTestWhileIdle(true); - ds.setTestOnBorrow(false); - setValidationQuery(ds, tableType); - ds.setTimeBetweenEvictionRunsMillis(maxIdelTime / 5); - ds.setMinEvictableIdleTimeMillis(maxIdelTime); - druidDataSource = ds; - // here is a cache of datasource, which using the string(jdbcUrl + jdbcUser + - // jdbcPassword) as key. - // and the default datasource init = 1, min = 1, max = 100, if one of connection idle - // time greater than 10 minutes. then connection will be retrieved. - JdbcDataSource.getDataSource().putSource(jdbcUrl + jdbcUser + jdbcPassword, ds); - } - conn = druidDataSource.getConnection(); - if (op == TJdbcOperation.READ) { - conn.setAutoCommit(false); - Preconditions.checkArgument(sql != null); - stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - if (tableType == TOdbcTableType.MYSQL) { - stmt.setFetchSize(Integer.MIN_VALUE); - } else { - stmt.setFetchSize(batchSize); - } + if (isNebula()) { batchSizeNum = batchSize; + Class.forName(driverClass); + conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + stmt = conn.prepareStatement(sql); } else { - stmt = conn.createStatement(); + ClassLoader parent = getClass().getClassLoader(); + ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); + druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl + jdbcUser + jdbcPassword); + if (druidDataSource == null) { + DruidDataSource ds = new DruidDataSource(); + ds.setDriverClassLoader(classLoader); + ds.setDriverClassName(driverClass); + ds.setUrl(jdbcUrl); + ds.setUsername(jdbcUser); + ds.setPassword(jdbcPassword); + ds.setMinIdle(minIdleSize); + ds.setInitialSize(minPoolSize); + ds.setMaxActive(maxPoolSize); + ds.setMaxWait(5000); + ds.setTestWhileIdle(true); + ds.setTestOnBorrow(false); + setValidationQuery(ds, tableType); + ds.setTimeBetweenEvictionRunsMillis(maxIdelTime / 5); + ds.setMinEvictableIdleTimeMillis(maxIdelTime); + druidDataSource = ds; + // here is a cache of datasource, which using the string(jdbcUrl + jdbcUser + + // jdbcPassword) as key. + // and the default datasource init = 1, min = 1, max = 100, if one of connection idle + // time greater than 10 minutes. then connection will be retrieved. + JdbcDataSource.getDataSource().putSource(jdbcUrl + jdbcUser + jdbcPassword, ds); + } + conn = druidDataSource.getConnection(); + if (op == TJdbcOperation.READ) { + conn.setAutoCommit(false); + Preconditions.checkArgument(sql != null); + stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + if (tableType == TOdbcTableType.MYSQL) { + stmt.setFetchSize(Integer.MIN_VALUE); + } else { + stmt.setFetchSize(batchSize); + } + batchSizeNum = batchSize; + } else { + stmt = conn.createStatement(); + } } } catch (MalformedURLException e) { throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e); @@ -300,6 +321,8 @@ public class JdbcExecutor { throw new UdfRuntimeException("Initialize datasource failed: ", e); } catch (FileNotFoundException e) { throw new UdfRuntimeException("FileNotFoundException failed: ", e); + } catch (Exception e) { + throw new UdfRuntimeException("Initialize datasource failed: ", e); } } diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java index 6dda5278f8..41107d548d 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/UdfUtils.java @@ -29,6 +29,9 @@ import org.apache.doris.thrift.TTypeNode; import com.google.common.base.Preconditions; import com.google.common.collect.Sets; +import com.vesoft.nebula.client.graph.data.DateTimeWrapper; +import com.vesoft.nebula.client.graph.data.DateWrapper; +import com.vesoft.nebula.client.graph.data.ValueWrapper; import org.apache.log4j.Logger; import sun.misc.Unsafe; @@ -584,4 +587,56 @@ public class UdfUtils { } return bytes; } + + // only used by nebula-graph + // transfer to an object that can copy to the block + public static Object convertObject(ValueWrapper value) { + try { + if (value.isLong()) { + return value.asLong(); + } + if (value.isBoolean()) { + return value.asBoolean(); + } + if (value.isDouble()) { + return value.asDouble(); + } + if (value.isString()) { + return value.asString(); + } + if (value.isTime()) { + return value.asTime().toString(); + } + if (value.isDate()) { + DateWrapper date = value.asDate(); + return LocalDate.of(date.getYear(), date.getMonth(), date.getDay()); + } + if (value.isDateTime()) { + DateTimeWrapper dateTime = value.asDateTime(); + return LocalDateTime.of(dateTime.getYear(), dateTime.getMonth(), dateTime.getDay(), + dateTime.getHour(), dateTime.getMinute(), dateTime.getSecond(), dateTime.getMicrosec() * 1000); + } + if (value.isVertex()) { + return value.asNode().toString(); + } + if (value.isEdge()) { + return value.asRelationship().toString(); + } + if (value.isPath()) { + return value.asPath().toString(); + } + if (value.isList()) { + return value.asList().toString(); + } + if (value.isSet()) { + return value.asSet().toString(); + } + if (value.isMap()) { + return value.asMap().toString(); + } + return null; + } catch (Exception e) { + return null; + } + } } diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 42aefece3a..6a302432c8 100644 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -1456,7 +1456,8 @@ visible_functions = [ [['esquery'], 'BOOLEAN', ['MAP', 'VARCHAR'], ''], [['esquery'], 'BOOLEAN', ['STRING', 'VARCHAR'], ''], [['esquery'], 'BOOLEAN', ['VARIANT', 'VARCHAR'], ''], - + # used for accept graph sql + [['g'], 'BOOLEAN', ['VARCHAR'], ''], # String builtin functions [['substr', 'substring'], 'VARCHAR', ['VARCHAR', 'INT'], 'ALWAYS_NULLABLE'], [['substr', 'substring'], 'VARCHAR', ['VARCHAR', 'INT', 'INT'], 'ALWAYS_NULLABLE'], diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index bce407a77a..0c5c7d6f66 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -391,7 +391,8 @@ enum TOdbcTableType { TRINO, PRESTO, OCEANBASE, - OCEANBASE_ORACLE + OCEANBASE_ORACLE, + NEBULA } struct TJdbcExecutorCtorParams {