[feature](graph)Support querying data from the Nebula graph database (#19209)
Support querying data from the Nebula graph database This feature comes from the needs of commercial customers who have used Doris and Nebula, hoping to connect these two databases changes mainly include: * add New Graph Database JDBC Type * Adapt the type and map the graph to the Doris type
This commit is contained in:
@ -202,7 +202,9 @@ Status JdbcConnector::query() {
|
||||
}
|
||||
|
||||
LOG(INFO) << "JdbcConnector::query has exec success: " << _sql_str;
|
||||
RETURN_IF_ERROR(_check_column_type());
|
||||
if (_conn_param.table_type != TOdbcTableType::NEBULA) {
|
||||
RETURN_IF_ERROR(_check_column_type());
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
||||
@ -265,6 +265,98 @@ PROPERTIES (
|
||||
);
|
||||
```
|
||||
|
||||
### 9.NebulaGraphTest (only supports queries)
|
||||
| Nebula version | Nebula JDBC Driver Version |
|
||||
|------------|-------------------|
|
||||
| 3.0.0 | nebula-jdbc-3.0.0-jar-with-dependencies.jar |
|
||||
|
||||
|
||||
```
|
||||
#step1.crate test data in nebula
|
||||
#1.1 create tag
|
||||
(root@nebula) [basketballplayer]> CREATE TAG test(t_str string,
|
||||
t_int int,
|
||||
t_date date,
|
||||
t_datetime datetime,
|
||||
t_bool bool,
|
||||
t_timestamp timestamp,
|
||||
t_float float,
|
||||
t_double double
|
||||
);
|
||||
#1.2 insert test data
|
||||
(root@nebula) [basketballplayer]> INSERT VERTEX test_type(t_str,t_int,t_date,t_datetime,t_bool,t_timestamp,t_float,t_double) values "zhangshan":("zhangshan",1000,date("2023-01-01"),datetime("2023-01-23 15:23:32"),true,1234242423,1.2,1.35);
|
||||
#1.3 check the data
|
||||
(root@nebula) [basketballplayer]> match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v limit 30;
|
||||
+-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| v.test_type.t_str | v.test_type.t_int | v.test_type.t_date | v.test_type.t_datetime | v.test_type.t_bool | v.test_type.t_timestamp | v.test_type.t_float | v.test_type.t_double | v |
|
||||
+-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| "zhangshan" | 1000 | 2023-01-01 | 2023-01-23T15:23:32.000000 | true | 1234242423 | 1.2000000476837158 | 1.35 | ("zhangshan" :test_type{t_bool: true, t_date: 2023-01-01, t_datetime: 2023-01-23T15:23:32.000000, t_double: 1.35, t_float: 1.2000000476837158, t_int: 1000, t_str: "zhangshan", t_timestamp: 1234242423}) |
|
||||
+-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
Got 1 rows (time spent 1616/2048 us)
|
||||
Mon, 17 Apr 2023 17:23:14 CST
|
||||
#step2. create table in doris
|
||||
#2.1 create a resource
|
||||
MySQL [test_db]> CREATE EXTERNAL RESOURCE gg_jdbc_resource
|
||||
properties (
|
||||
"type"="jdbc",
|
||||
"user"="root",
|
||||
"password"="123",
|
||||
"jdbc_url"="jdbc:nebula://127.0.0.1:9669/basketballplayer",
|
||||
"driver_url"="file:///home/clz/baidu/bdg/doris/be/lib/nebula-jdbc-3.0.0-jar-with-dependencies.jar", --Need to be placed in the be/lib directory--
|
||||
"driver_class"="com.vesoft.nebula.jdbc.NebulaDriver"
|
||||
);
|
||||
#2.2 Create a facade that mainly tells Doris how to parse the data returned by Nebulagraph
|
||||
MySQL [test_db]> CREATE TABLE `test_type` (
|
||||
`t_str` varchar(64),
|
||||
`t_int` bigint,
|
||||
`t_date` date,
|
||||
`t_datetime` datetime,
|
||||
`t_bool` boolean,
|
||||
`t_timestamp` bigint,
|
||||
`t_float` double,
|
||||
`t_double` double,
|
||||
`t_vertx` varchar(128) --vertex map type varchar in doris---
|
||||
) ENGINE=JDBC
|
||||
PROPERTIES (
|
||||
"resource" = "gg_jdbc_resource",
|
||||
"table" = "xx", --please fill in any value here, we do not use it --
|
||||
"table_type"="nebula"
|
||||
);
|
||||
#2.3 Query the graph surface and use the g() function to transparently pass the nGQL of the graph to Nebula
|
||||
MySQL [test_db]> select * from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v')\G;
|
||||
*************************** 1. row ***************************
|
||||
t_str: zhangshan
|
||||
t_int: 1000
|
||||
t_date: 2023-01-01
|
||||
t_datetime: 2023-01-23 15:23:32
|
||||
t_bool: 1
|
||||
t_timestamp: 1234242423
|
||||
t_float: 1.2000000476837158
|
||||
t_double: 1.35
|
||||
t_vertx: ("zhangshan" :test_type {t_datetime: utc datetime: 2023-01-23T15:23:32.000000, timezoneOffset: 0, t_timestamp: 1234242423, t_date: 2023-01-01, t_double: 1.35, t_str: "zhangshan", t_int: 1000, t_bool: true, t_float: 1.2000000476837158})
|
||||
1 row in set (0.024 sec)
|
||||
#2.3 Associate queries with other tables in Doris
|
||||
#Assuming there is a user table
|
||||
MySQL [test_db]> select * from t_user;
|
||||
+-----------+------+---------------------------------+
|
||||
| username | age | addr |
|
||||
+-----------+------+---------------------------------+
|
||||
| zhangshan | 26 | 北京市西二旗街道1008号 |
|
||||
+-----------+------+---------------------------------+
|
||||
| lisi | 29 | 北京市西二旗街道1007号 |
|
||||
+-----------+------+---------------------------------+
|
||||
1 row in set (0.013 sec)
|
||||
#Associate with this table to query user related information
|
||||
MySQL [test_db]> select u.* from (select t_str username from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str limit 1')) g left join t_user u on g.username=u.username;
|
||||
+-----------+------+---------------------------------+
|
||||
| username | age | addr |
|
||||
+-----------+------+---------------------------------+
|
||||
| zhangshan | 26 | 北京市西二旗街道1008号 |
|
||||
+-----------+------+---------------------------------+
|
||||
1 row in set (0.029 sec)
|
||||
```
|
||||
|
||||
|
||||
> **Note:**
|
||||
>
|
||||
> When creating an OceanBase external table, you only need to specify the `oceanbase mode` parameter when creating a resource, and the table type of the table to be created is oceanbase
|
||||
@ -411,6 +503,17 @@ The followings list how data types in different databases are mapped in Doris.
|
||||
For MySQL mode, please refer to [MySQL type mapping](#MySQL)
|
||||
For Oracle mode, please refer to [Oracle type mapping](#Oracle)
|
||||
|
||||
### Nebula-graph
|
||||
| nebula | Doris |
|
||||
|:------------:|:-------------------:|
|
||||
| tinyint/samllint/int/int64 | bigint |
|
||||
| double/float | double |
|
||||
| date | date |
|
||||
| timestamp | bigint |
|
||||
| datetime | datetime |
|
||||
| bool | boolean |
|
||||
| vertex/edge/path/list/set/time etc | varchar |
|
||||
|
||||
## Q&A
|
||||
|
||||
See the FAQ section in [JDBC](https://doris.apache.org/docs/dev/lakehouse/multi-catalog/jdbc/).
|
||||
|
||||
@ -258,6 +258,97 @@ PROPERTIES (
|
||||
"table_type"="oceanbase"
|
||||
);
|
||||
```
|
||||
|
||||
### 9.Nebula-graph测试 (仅支持查询)
|
||||
| nebula版本 | JDBC驱动版本 |
|
||||
|------------|-------------------|
|
||||
| 3.0.0 | nebula-jdbc-3.0.0-jar-with-dependencies.jar |
|
||||
```
|
||||
#step1.在nebula创建测试数据
|
||||
#1.1 创建结点
|
||||
(root@nebula) [basketballplayer]> CREATE TAG test(t_str string,
|
||||
t_int int,
|
||||
t_date date,
|
||||
t_datetime datetime,
|
||||
t_bool bool,
|
||||
t_timestamp timestamp,
|
||||
t_float float,
|
||||
t_double double
|
||||
);
|
||||
#1.2 插入数据
|
||||
(root@nebula) [basketballplayer]> INSERT VERTEX test_type(t_str,t_int,t_date,t_datetime,t_bool,t_timestamp,t_float,t_double) values "zhangshan":("zhangshan",1000,date("2023-01-01"),datetime("2023-01-23 15:23:32"),true,1234242423,1.2,1.35);
|
||||
#1.3 查询数据
|
||||
(root@nebula) [basketballplayer]> match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v limit 30;
|
||||
+-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| v.test_type.t_str | v.test_type.t_int | v.test_type.t_date | v.test_type.t_datetime | v.test_type.t_bool | v.test_type.t_timestamp | v.test_type.t_float | v.test_type.t_double | v |
|
||||
+-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| "zhangshan" | 1000 | 2023-01-01 | 2023-01-23T15:23:32.000000 | true | 1234242423 | 1.2000000476837158 | 1.35 | ("zhangshan" :test_type{t_bool: true, t_date: 2023-01-01, t_datetime: 2023-01-23T15:23:32.000000, t_double: 1.35, t_float: 1.2000000476837158, t_int: 1000, t_str: "zhangshan", t_timestamp: 1234242423}) |
|
||||
+-------------------+-------------------+--------------------+----------------------------+--------------------+-------------------------+---------------------+----------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
Got 1 rows (time spent 1616/2048 us)
|
||||
Mon, 17 Apr 2023 17:23:14 CST
|
||||
#step2.在doris中创建外表
|
||||
#2.1 创建一个resource
|
||||
MySQL [test_db]> CREATE EXTERNAL RESOURCE gg_jdbc_resource
|
||||
properties (
|
||||
"type"="jdbc",
|
||||
"user"="root",
|
||||
"password"="123",
|
||||
"jdbc_url"="jdbc:nebula://127.0.0.1:9669/basketballplayer",
|
||||
"driver_url"="file:///home/clz/baidu/bdg/doris/be/lib/nebula-jdbc-3.0.0-jar-with-dependencies.jar", --仅支持本地路径,需放到be/lib目录下--
|
||||
"driver_class"="com.vesoft.nebula.jdbc.NebulaDriver"
|
||||
);
|
||||
#2.2 创建一个外表,这个主要是告诉doris如何解析nebulagraph返回的数据
|
||||
MySQL [test_db]> CREATE TABLE `test_type` (
|
||||
`t_str` varchar(64),
|
||||
`t_int` bigint,
|
||||
`t_date` date,
|
||||
`t_datetime` datetime,
|
||||
`t_bool` boolean,
|
||||
`t_timestamp` bigint,
|
||||
`t_float` double,
|
||||
`t_double` double,
|
||||
`t_vertx` varchar(128) --vertex对应doris类型是varchar---
|
||||
) ENGINE=JDBC
|
||||
PROPERTIES (
|
||||
"resource" = "gg_jdbc_resource",
|
||||
"table" = "xx", --因为graph没有表的概念,这里随便填一个值--
|
||||
"table_type"="nebula"
|
||||
);
|
||||
#2.3 查询graph外表,用g()函数把图的nGQL透传给nebula
|
||||
MySQL [test_db]> select * from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str,v.test_type.t_int,v.test_type.t_date,v.test_type.t_datetime,v.test_type.t_bool,v.test_type.t_timestamp,v.test_type.t_float,v.test_type.t_double,v')\G;
|
||||
*************************** 1. row ***************************
|
||||
t_str: zhangshan
|
||||
t_int: 1000
|
||||
t_date: 2023-01-01
|
||||
t_datetime: 2023-01-23 15:23:32
|
||||
t_bool: 1
|
||||
t_timestamp: 1234242423
|
||||
t_float: 1.2000000476837158
|
||||
t_double: 1.35
|
||||
t_vertx: ("zhangshan" :test_type {t_datetime: utc datetime: 2023-01-23T15:23:32.000000, timezoneOffset: 0, t_timestamp: 1234242423, t_date: 2023-01-01, t_double: 1.35, t_str: "zhangshan", t_int: 1000, t_bool: true, t_float: 1.2000000476837158})
|
||||
1 row in set (0.024 sec)
|
||||
#2.3 与doris的其他表进行关联查询
|
||||
#假设有张用户表
|
||||
MySQL [test_db]> select * from t_user;
|
||||
+-----------+------+---------------------------------+
|
||||
| username | age | addr |
|
||||
+-----------+------+---------------------------------+
|
||||
| zhangshan | 26 | 北京市西二旗街道1008号 |
|
||||
+-----------+------+---------------------------------+
|
||||
| lisi | 29 | 北京市西二旗街道1007号 |
|
||||
+-----------+------+---------------------------------+
|
||||
1 row in set (0.013 sec)
|
||||
#与这张用表关联查询用户相关的信息
|
||||
MySQL [test_db]> select u.* from (select t_str username from test_type where g('match (v:test_type) where id(v)=="zhangshan" return v.test_type.t_str limit 1')) g left join t_user u on g.username=u.username;
|
||||
+-----------+------+---------------------------------+
|
||||
| username | age | addr |
|
||||
+-----------+------+---------------------------------+
|
||||
| zhangshan | 26 | 北京市西二旗街道1008号 |
|
||||
+-----------+------+---------------------------------+
|
||||
1 row in set (0.029 sec)
|
||||
```
|
||||
|
||||
|
||||
> **注意:**
|
||||
>
|
||||
> 在创建OceanBase外表时,只需在创建Resource时指定`oceanbase_mode`参数,创建外表的table_type为oceanbase。
|
||||
@ -403,6 +494,17 @@ PROPERTIES (
|
||||
MySQL 模式请参考 [MySQL类型映射](#MySQL)
|
||||
Oracle 模式请参考 [Oracle类型映射](#Oracle)
|
||||
|
||||
### Nebula-graph
|
||||
| nebula | Doris |
|
||||
|:------------:|:-------------------:|
|
||||
| tinyint/samllint/int/int64 | bigint |
|
||||
| double/float | double |
|
||||
| date | date |
|
||||
| timestamp | bigint |
|
||||
| datetime | datetime |
|
||||
| bool | boolean |
|
||||
| vertex/edge/path/list/set/time等 | varchar |
|
||||
|
||||
## Q&A
|
||||
|
||||
请参考 [JDBC Catalog](../multi-catalog/jdbc.md) 中的 常见问题一节。
|
||||
|
||||
@ -61,6 +61,7 @@ import java.util.Map;
|
||||
public class JdbcResource extends Resource {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcResource.class);
|
||||
|
||||
public static final String JDBC_NEBULA = "jdbc:nebula";
|
||||
public static final String JDBC_MYSQL = "jdbc:mysql";
|
||||
public static final String JDBC_MARIADB = "jdbc:mariadb";
|
||||
public static final String JDBC_POSTGRESQL = "jdbc:postgresql";
|
||||
@ -72,6 +73,7 @@ public class JdbcResource extends Resource {
|
||||
public static final String JDBC_PRESTO = "jdbc:presto";
|
||||
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
|
||||
|
||||
public static final String NEBULA = "NEBULA";
|
||||
public static final String MYSQL = "MYSQL";
|
||||
public static final String POSTGRESQL = "POSTGRESQL";
|
||||
public static final String ORACLE = "ORACLE";
|
||||
@ -296,6 +298,8 @@ public class JdbcResource extends Resource {
|
||||
} else {
|
||||
throw new DdlException("Invalid OceanBase mode: " + oceanbaseMode + ". Must be 'mysql' or 'oracle'");
|
||||
}
|
||||
} else if (url.startsWith(JDBC_NEBULA)) {
|
||||
return NEBULA;
|
||||
}
|
||||
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
|
||||
}
|
||||
|
||||
@ -70,6 +70,7 @@ public class JdbcTable extends Table {
|
||||
|
||||
static {
|
||||
Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
|
||||
tempMap.put("nebula", TOdbcTableType.NEBULA);
|
||||
tempMap.put("mysql", TOdbcTableType.MYSQL);
|
||||
tempMap.put("postgresql", TOdbcTableType.POSTGRESQL);
|
||||
tempMap.put("sqlserver", TOdbcTableType.SQLSERVER);
|
||||
|
||||
@ -20,6 +20,7 @@ package org.apache.doris.planner;
|
||||
import org.apache.doris.analysis.Analyzer;
|
||||
import org.apache.doris.analysis.Expr;
|
||||
import org.apache.doris.analysis.ExprSubstitutionMap;
|
||||
import org.apache.doris.analysis.FunctionCallExpr;
|
||||
import org.apache.doris.analysis.SlotDescriptor;
|
||||
import org.apache.doris.analysis.SlotRef;
|
||||
import org.apache.doris.analysis.TupleDescriptor;
|
||||
@ -53,6 +54,7 @@ public class JdbcScanNode extends ScanNode {
|
||||
private final List<String> filters = new ArrayList<String>();
|
||||
private String tableName;
|
||||
private TOdbcTableType jdbcType;
|
||||
private String graphQueryString = "";
|
||||
|
||||
public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean isJdbcExternalTable) {
|
||||
super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE);
|
||||
@ -71,6 +73,26 @@ public class JdbcScanNode extends ScanNode {
|
||||
public void init(Analyzer analyzer) throws UserException {
|
||||
super.init(analyzer);
|
||||
computeStats(analyzer);
|
||||
getGraphQueryString();
|
||||
}
|
||||
|
||||
private boolean isNebula() {
|
||||
return jdbcType == TOdbcTableType.NEBULA;
|
||||
}
|
||||
|
||||
private void getGraphQueryString() {
|
||||
if (!isNebula()) {
|
||||
return;
|
||||
}
|
||||
for (Expr expr : conjuncts) {
|
||||
FunctionCallExpr functionCallExpr = (FunctionCallExpr) expr;
|
||||
if ("g".equals(functionCallExpr.getFnName().getFunction())) {
|
||||
graphQueryString = functionCallExpr.getChild(0).getStringValue();
|
||||
break;
|
||||
}
|
||||
}
|
||||
//clean conjusts cause graph sannnode no need conjuncts
|
||||
conjuncts = Lists.newArrayList();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -130,6 +152,9 @@ public class JdbcScanNode extends ScanNode {
|
||||
}
|
||||
|
||||
private String getJdbcQueryStr() {
|
||||
if (isNebula()) {
|
||||
return graphQueryString;
|
||||
}
|
||||
StringBuilder sql = new StringBuilder("SELECT ");
|
||||
|
||||
// Oracle use the where clause to do top n
|
||||
|
||||
@ -35,6 +35,11 @@ under the License.
|
||||
<fe_ut_parallel>1</fe_ut_parallel>
|
||||
</properties>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.vesoft</groupId>
|
||||
<artifactId>client</artifactId>
|
||||
<version>3.0.0</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.doris</groupId>
|
||||
<artifactId>fe-common</artifactId>
|
||||
|
||||
@ -29,6 +29,7 @@ import com.clickhouse.data.value.UnsignedInteger;
|
||||
import com.clickhouse.data.value.UnsignedLong;
|
||||
import com.clickhouse.data.value.UnsignedShort;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.vesoft.nebula.client.graph.data.ValueWrapper;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.thrift.TDeserializer;
|
||||
import org.apache.thrift.TException;
|
||||
@ -45,6 +46,7 @@ import java.net.MalformedURLException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.sql.Connection;
|
||||
import java.sql.Date;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.ResultSetMetaData;
|
||||
@ -77,6 +79,7 @@ public class JdbcExecutor {
|
||||
private int maxPoolSize;
|
||||
private int minIdleSize;
|
||||
private int maxIdelTime;
|
||||
private TOdbcTableType tableType;
|
||||
|
||||
public JdbcExecutor(byte[] thriftParams) throws Exception {
|
||||
TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams();
|
||||
@ -86,6 +89,7 @@ public class JdbcExecutor {
|
||||
} catch (TException e) {
|
||||
throw new InternalException(e.getMessage());
|
||||
}
|
||||
tableType = request.table_type;
|
||||
minPoolSize = Integer.valueOf(System.getProperty("JDBC_MIN_POOL", "1"));
|
||||
maxPoolSize = Integer.valueOf(System.getProperty("JDBC_MAX_POOL", "100"));
|
||||
maxIdelTime = Integer.valueOf(System.getProperty("JDBC_MAX_IDEL_TIME", "300000"));
|
||||
@ -98,6 +102,10 @@ public class JdbcExecutor {
|
||||
request.jdbc_url, request.jdbc_user, request.jdbc_password, request.op, request.table_type);
|
||||
}
|
||||
|
||||
public boolean isNebula() {
|
||||
return tableType == TOdbcTableType.NEBULA;
|
||||
}
|
||||
|
||||
public void close() throws Exception {
|
||||
if (resultSet != null) {
|
||||
resultSet.close();
|
||||
@ -127,7 +135,9 @@ public class JdbcExecutor {
|
||||
resultColumnTypeNames = new ArrayList<>(columnCount);
|
||||
block = new ArrayList<>(columnCount);
|
||||
for (int i = 0; i < columnCount; ++i) {
|
||||
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
|
||||
if (!isNebula()) {
|
||||
resultColumnTypeNames.add(resultSetMetaData.getColumnClassName(i + 1));
|
||||
}
|
||||
block.add((Object[]) Array.newInstance(Object.class, batchSizeNum));
|
||||
}
|
||||
return columnCount;
|
||||
@ -200,7 +210,11 @@ public class JdbcExecutor {
|
||||
curBlockRows = 0;
|
||||
do {
|
||||
for (int i = 0; i < columnCount; ++i) {
|
||||
block.get(i)[curBlockRows] = resultSet.getObject(i + 1);
|
||||
if (isNebula()) {
|
||||
block.get(i)[curBlockRows] = UdfUtils.convertObject((ValueWrapper) resultSet.getObject(i + 1));
|
||||
} else {
|
||||
block.get(i)[curBlockRows] = resultSet.getObject(i + 1);
|
||||
}
|
||||
}
|
||||
curBlockRows++;
|
||||
} while (curBlockRows < batchSize && resultSet.next());
|
||||
@ -254,45 +268,52 @@ public class JdbcExecutor {
|
||||
private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser,
|
||||
String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType) throws UdfRuntimeException {
|
||||
try {
|
||||
ClassLoader parent = getClass().getClassLoader();
|
||||
ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent);
|
||||
druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl + jdbcUser + jdbcPassword);
|
||||
if (druidDataSource == null) {
|
||||
DruidDataSource ds = new DruidDataSource();
|
||||
ds.setDriverClassLoader(classLoader);
|
||||
ds.setDriverClassName(driverClass);
|
||||
ds.setUrl(jdbcUrl);
|
||||
ds.setUsername(jdbcUser);
|
||||
ds.setPassword(jdbcPassword);
|
||||
ds.setMinIdle(minIdleSize);
|
||||
ds.setInitialSize(minPoolSize);
|
||||
ds.setMaxActive(maxPoolSize);
|
||||
ds.setMaxWait(5000);
|
||||
ds.setTestWhileIdle(true);
|
||||
ds.setTestOnBorrow(false);
|
||||
setValidationQuery(ds, tableType);
|
||||
ds.setTimeBetweenEvictionRunsMillis(maxIdelTime / 5);
|
||||
ds.setMinEvictableIdleTimeMillis(maxIdelTime);
|
||||
druidDataSource = ds;
|
||||
// here is a cache of datasource, which using the string(jdbcUrl + jdbcUser +
|
||||
// jdbcPassword) as key.
|
||||
// and the default datasource init = 1, min = 1, max = 100, if one of connection idle
|
||||
// time greater than 10 minutes. then connection will be retrieved.
|
||||
JdbcDataSource.getDataSource().putSource(jdbcUrl + jdbcUser + jdbcPassword, ds);
|
||||
}
|
||||
conn = druidDataSource.getConnection();
|
||||
if (op == TJdbcOperation.READ) {
|
||||
conn.setAutoCommit(false);
|
||||
Preconditions.checkArgument(sql != null);
|
||||
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
if (tableType == TOdbcTableType.MYSQL) {
|
||||
stmt.setFetchSize(Integer.MIN_VALUE);
|
||||
} else {
|
||||
stmt.setFetchSize(batchSize);
|
||||
}
|
||||
if (isNebula()) {
|
||||
batchSizeNum = batchSize;
|
||||
Class.forName(driverClass);
|
||||
conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword);
|
||||
stmt = conn.prepareStatement(sql);
|
||||
} else {
|
||||
stmt = conn.createStatement();
|
||||
ClassLoader parent = getClass().getClassLoader();
|
||||
ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent);
|
||||
druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl + jdbcUser + jdbcPassword);
|
||||
if (druidDataSource == null) {
|
||||
DruidDataSource ds = new DruidDataSource();
|
||||
ds.setDriverClassLoader(classLoader);
|
||||
ds.setDriverClassName(driverClass);
|
||||
ds.setUrl(jdbcUrl);
|
||||
ds.setUsername(jdbcUser);
|
||||
ds.setPassword(jdbcPassword);
|
||||
ds.setMinIdle(minIdleSize);
|
||||
ds.setInitialSize(minPoolSize);
|
||||
ds.setMaxActive(maxPoolSize);
|
||||
ds.setMaxWait(5000);
|
||||
ds.setTestWhileIdle(true);
|
||||
ds.setTestOnBorrow(false);
|
||||
setValidationQuery(ds, tableType);
|
||||
ds.setTimeBetweenEvictionRunsMillis(maxIdelTime / 5);
|
||||
ds.setMinEvictableIdleTimeMillis(maxIdelTime);
|
||||
druidDataSource = ds;
|
||||
// here is a cache of datasource, which using the string(jdbcUrl + jdbcUser +
|
||||
// jdbcPassword) as key.
|
||||
// and the default datasource init = 1, min = 1, max = 100, if one of connection idle
|
||||
// time greater than 10 minutes. then connection will be retrieved.
|
||||
JdbcDataSource.getDataSource().putSource(jdbcUrl + jdbcUser + jdbcPassword, ds);
|
||||
}
|
||||
conn = druidDataSource.getConnection();
|
||||
if (op == TJdbcOperation.READ) {
|
||||
conn.setAutoCommit(false);
|
||||
Preconditions.checkArgument(sql != null);
|
||||
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
if (tableType == TOdbcTableType.MYSQL) {
|
||||
stmt.setFetchSize(Integer.MIN_VALUE);
|
||||
} else {
|
||||
stmt.setFetchSize(batchSize);
|
||||
}
|
||||
batchSizeNum = batchSize;
|
||||
} else {
|
||||
stmt = conn.createStatement();
|
||||
}
|
||||
}
|
||||
} catch (MalformedURLException e) {
|
||||
throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e);
|
||||
@ -300,6 +321,8 @@ public class JdbcExecutor {
|
||||
throw new UdfRuntimeException("Initialize datasource failed: ", e);
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new UdfRuntimeException("FileNotFoundException failed: ", e);
|
||||
} catch (Exception e) {
|
||||
throw new UdfRuntimeException("Initialize datasource failed: ", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -29,6 +29,9 @@ import org.apache.doris.thrift.TTypeNode;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.vesoft.nebula.client.graph.data.DateTimeWrapper;
|
||||
import com.vesoft.nebula.client.graph.data.DateWrapper;
|
||||
import com.vesoft.nebula.client.graph.data.ValueWrapper;
|
||||
import org.apache.log4j.Logger;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
@ -584,4 +587,56 @@ public class UdfUtils {
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
// only used by nebula-graph
|
||||
// transfer to an object that can copy to the block
|
||||
public static Object convertObject(ValueWrapper value) {
|
||||
try {
|
||||
if (value.isLong()) {
|
||||
return value.asLong();
|
||||
}
|
||||
if (value.isBoolean()) {
|
||||
return value.asBoolean();
|
||||
}
|
||||
if (value.isDouble()) {
|
||||
return value.asDouble();
|
||||
}
|
||||
if (value.isString()) {
|
||||
return value.asString();
|
||||
}
|
||||
if (value.isTime()) {
|
||||
return value.asTime().toString();
|
||||
}
|
||||
if (value.isDate()) {
|
||||
DateWrapper date = value.asDate();
|
||||
return LocalDate.of(date.getYear(), date.getMonth(), date.getDay());
|
||||
}
|
||||
if (value.isDateTime()) {
|
||||
DateTimeWrapper dateTime = value.asDateTime();
|
||||
return LocalDateTime.of(dateTime.getYear(), dateTime.getMonth(), dateTime.getDay(),
|
||||
dateTime.getHour(), dateTime.getMinute(), dateTime.getSecond(), dateTime.getMicrosec() * 1000);
|
||||
}
|
||||
if (value.isVertex()) {
|
||||
return value.asNode().toString();
|
||||
}
|
||||
if (value.isEdge()) {
|
||||
return value.asRelationship().toString();
|
||||
}
|
||||
if (value.isPath()) {
|
||||
return value.asPath().toString();
|
||||
}
|
||||
if (value.isList()) {
|
||||
return value.asList().toString();
|
||||
}
|
||||
if (value.isSet()) {
|
||||
return value.asSet().toString();
|
||||
}
|
||||
if (value.isMap()) {
|
||||
return value.asMap().toString();
|
||||
}
|
||||
return null;
|
||||
} catch (Exception e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1456,7 +1456,8 @@ visible_functions = [
|
||||
[['esquery'], 'BOOLEAN', ['MAP', 'VARCHAR'], ''],
|
||||
[['esquery'], 'BOOLEAN', ['STRING', 'VARCHAR'], ''],
|
||||
[['esquery'], 'BOOLEAN', ['VARIANT', 'VARCHAR'], ''],
|
||||
|
||||
# used for accept graph sql
|
||||
[['g'], 'BOOLEAN', ['VARCHAR'], ''],
|
||||
# String builtin functions
|
||||
[['substr', 'substring'], 'VARCHAR', ['VARCHAR', 'INT'], 'ALWAYS_NULLABLE'],
|
||||
[['substr', 'substring'], 'VARCHAR', ['VARCHAR', 'INT', 'INT'], 'ALWAYS_NULLABLE'],
|
||||
|
||||
@ -391,7 +391,8 @@ enum TOdbcTableType {
|
||||
TRINO,
|
||||
PRESTO,
|
||||
OCEANBASE,
|
||||
OCEANBASE_ORACLE
|
||||
OCEANBASE_ORACLE,
|
||||
NEBULA
|
||||
}
|
||||
|
||||
struct TJdbcExecutorCtorParams {
|
||||
|
||||
Reference in New Issue
Block a user