[fix](jdbc) fix jdbc driver bug and external datasource p2 test case issue (#16033)
Fix bug that when create jdbc resource with only jdbc driver file name, it will failed to do checksum This is because we forgot the pass the full driver url to JdbcClient. Add ResultSet.FETCH_FORWARD and set AutoCommit to false to jdbc connection, so to avoid OOM when fetching large amount of data set useCursorFetch in jdbc url for both MySQL and PostgreSQL. Fix some p2 external datasource bug
This commit is contained in:
@ -189,10 +189,10 @@ public class JdbcResource extends Resource {
|
||||
// skip checking checksum when running ut
|
||||
return "";
|
||||
}
|
||||
String fullDriverPath = getRealDriverPath(driverPath);
|
||||
String fullDriverUrl = getFullDriverUrl(driverPath);
|
||||
InputStream inputStream = null;
|
||||
try {
|
||||
inputStream = Util.getInputStreamFromUrl(fullDriverPath, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS);
|
||||
inputStream = Util.getInputStreamFromUrl(fullDriverUrl, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS);
|
||||
MessageDigest digest = MessageDigest.getInstance("MD5");
|
||||
byte[] buf = new byte[4096];
|
||||
int bytesRead = 0;
|
||||
@ -213,7 +213,7 @@ public class JdbcResource extends Resource {
|
||||
}
|
||||
}
|
||||
|
||||
private static String getRealDriverPath(String driverUrl) {
|
||||
public static String getFullDriverUrl(String driverUrl) {
|
||||
try {
|
||||
URI uri = new URI(driverUrl);
|
||||
String schema = uri.getScheme();
|
||||
@ -254,6 +254,8 @@ public class JdbcResource extends Resource {
|
||||
// it will convert to Doris tinyint, not bit.
|
||||
newJdbcUrl = checkJdbcUrlParam(newJdbcUrl, "yearIsDateType", "true", "false");
|
||||
newJdbcUrl = checkJdbcUrlParam(newJdbcUrl, "tinyInt1isBit", "true", "false");
|
||||
}
|
||||
if (dbType.equals(MYSQL) || dbType.equals(POSTGRESQL)) {
|
||||
newJdbcUrl = checkJdbcUrlParam(newJdbcUrl, "useCursorFetch", "false", "true");
|
||||
}
|
||||
return newJdbcUrl;
|
||||
|
||||
@ -50,29 +50,20 @@ public class JdbcClient {
|
||||
|
||||
private String dbType;
|
||||
private String jdbcUser;
|
||||
private String jdbcPasswd;
|
||||
private String jdbcUrl;
|
||||
private String driverUrl;
|
||||
private String driverClass;
|
||||
|
||||
private URLClassLoader classLoader = null;
|
||||
|
||||
private HikariDataSource dataSource = null;
|
||||
|
||||
|
||||
public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass) {
|
||||
this.jdbcUser = user;
|
||||
this.jdbcPasswd = password;
|
||||
this.jdbcUrl = jdbcUrl;
|
||||
this.dbType = parseDbType(jdbcUrl);
|
||||
this.driverUrl = driverUrl;
|
||||
this.driverClass = driverClass;
|
||||
|
||||
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
// TODO(ftw): The problem here is that the jar package is handled by FE
|
||||
// and URLClassLoader may load the jar package directly into memory
|
||||
URL[] urls = {new URL(driverUrl)};
|
||||
URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))};
|
||||
// set parent ClassLoader to null, we can achieve class loading isolation.
|
||||
classLoader = URLClassLoader.newInstance(urls, null);
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
@ -80,7 +71,7 @@ public class JdbcClient {
|
||||
config.setDriverClassName(driverClass);
|
||||
config.setJdbcUrl(jdbcUrl);
|
||||
config.setUsername(jdbcUser);
|
||||
config.setPassword(jdbcPasswd);
|
||||
config.setPassword(password);
|
||||
config.setMaximumPoolSize(1);
|
||||
dataSource = new HikariDataSource(config);
|
||||
} catch (MalformedURLException e) {
|
||||
|
||||
@ -249,8 +249,10 @@ public class JdbcExecutor {
|
||||
dataSource = new HikariDataSource(config);
|
||||
conn = dataSource.getConnection();
|
||||
if (op == TJdbcOperation.READ) {
|
||||
conn.setAutoCommit(false);
|
||||
Preconditions.checkArgument(sql != null);
|
||||
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY,
|
||||
ResultSet.FETCH_FORWARD);
|
||||
stmt.setFetchSize(batchSize);
|
||||
} else {
|
||||
stmt = conn.createStatement();
|
||||
@ -264,3 +266,4 @@ public class JdbcExecutor {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -237,7 +237,16 @@ http://minsk/odessages.yandex.ru/vorozhitelnosti/Armanada-yeni-boyiny_i_motot-bi
|
||||
|
||||
-- !06 --
|
||||
-10 5604
|
||||
0 1853
|
||||
-9 603
|
||||
-8 236
|
||||
-7 133
|
||||
-6 123
|
||||
-5 105
|
||||
5 82
|
||||
6 91
|
||||
7 102
|
||||
8 156
|
||||
9 222
|
||||
10 4291
|
||||
|
||||
-- !07 --
|
||||
|
||||
@ -38,9 +38,9 @@
|
||||
3
|
||||
|
||||
-- !q10 --
|
||||
2
|
||||
3
|
||||
4
|
||||
150000000
|
||||
149999999
|
||||
149999996
|
||||
|
||||
-- !q11 --
|
||||
1
|
||||
@ -48,6 +48,7 @@
|
||||
3
|
||||
|
||||
-- !q12 --
|
||||
2
|
||||
3
|
||||
4
|
||||
150000000
|
||||
149999999
|
||||
149999996
|
||||
|
||||
|
||||
@ -36,7 +36,7 @@ suite("test_external_catalog_es", "p2") {
|
||||
"elasticsearch.hosts"="http://${extEsHost}:${extEsPort}",
|
||||
"elasticsearch.nodes_discovery"="false",
|
||||
"elasticsearch.username"="${extEsUser}",
|
||||
"elasticsearch.username"="${extEsPassword}",
|
||||
"elasticsearch.username"="${extEsPassword}"
|
||||
);
|
||||
"""
|
||||
|
||||
|
||||
@ -44,10 +44,10 @@ suite("test_external_catalog_icebergv2", "p2") {
|
||||
}
|
||||
// test time travel stmt
|
||||
def q02 = {
|
||||
qt_q09 """ select c_custkey from customer for time as of '2022-12-27 10:21:36' limit 3 """
|
||||
qt_q10 """ select c_custkey from customer for time as of '2022-12-28 10:21:36' limit 3 """
|
||||
qt_q11 """ select c_custkey from customer for version as of 906874575350293177 limit 3 """
|
||||
qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 limit 3 """
|
||||
qt_q09 """ select c_custkey from customer for time as of '2022-12-27 10:21:36' order by c_custkey limit 3 """
|
||||
qt_q10 """ select c_custkey from customer for time as of '2022-12-28 10:21:36' order by c_custkey desc limit 3 """
|
||||
qt_q11 """ select c_custkey from customer for version as of 906874575350293177 order by c_custkey limit 3 """
|
||||
qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 order by c_custkey desc limit 3 """
|
||||
}
|
||||
sql """ use `tpch_1000_icebergv2`; """
|
||||
q01()
|
||||
|
||||
@ -35,6 +35,7 @@ suite("test_external_catalog_mysql", "p2") {
|
||||
sql """use ${mysqlDatabaseName01};"""
|
||||
|
||||
|
||||
sql """drop catalog if exists ${mysqlCatalogName};"""
|
||||
sql """drop resource if exists ${mysqlResource01};"""
|
||||
sql """
|
||||
CREATE RESOURCE ${mysqlResource01}
|
||||
@ -43,13 +44,11 @@ suite("test_external_catalog_mysql", "p2") {
|
||||
"user"="${extMysqlUser}",
|
||||
"password"="${extMysqlPassword}",
|
||||
"jdbc_url"="jdbc:mysql://${extMysqlHost}:${extMysqlPort}/ssb?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false",
|
||||
"driver_url"="https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar",
|
||||
"driver_url"="https://doris-community-bj-1308700295.cos.ap-beijing.myqcloud.com/jdbc_drivers/mysql-connector-java-8.0.25.jar",
|
||||
"driver_class"="com.mysql.cj.jdbc.Driver"
|
||||
);
|
||||
"""
|
||||
|
||||
sql """drop catalog if exists ${mysqlCatalogName};"""
|
||||
|
||||
sql """CREATE CATALOG ${mysqlCatalogName} WITH RESOURCE ${mysqlResource01};"""
|
||||
// sql """drop catalog if exists ${mysqlCatalogName};"""
|
||||
|
||||
|
||||
@ -44,7 +44,7 @@ suite("test_external_resource_mysql", "p2") {
|
||||
"user"="${extMysqlUser}",
|
||||
"password"="${extMysqlPassword}",
|
||||
"jdbc_url"="jdbc:mysql://${extMysqlHost}:${extMysqlPort}/ssb?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false",
|
||||
"driver_url"="https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar",
|
||||
"driver_url"="https://doris-community-bj-1308700295.cos.ap-beijing.myqcloud.com/jdbc_drivers/mysql-connector-java-8.0.25.jar",
|
||||
"driver_class"="com.mysql.cj.jdbc.Driver"
|
||||
);
|
||||
"""
|
||||
@ -126,7 +126,7 @@ suite("test_external_resource_mysql", "p2") {
|
||||
) ENGINE=JDBC
|
||||
PROPERTIES (
|
||||
"resource" = "${mysqlResourceName}",
|
||||
"table" = "customer",
|
||||
"table" = "supplier",
|
||||
"table_type"="mysql"
|
||||
);
|
||||
"""
|
||||
|
||||
@ -29,20 +29,20 @@ suite("test_external_pg", "p2") {
|
||||
String pgTableNameCustomer = "jdbc_pg_14_customer"
|
||||
String pgTableNameSupplier = "jdbc_pg_14_supplier"
|
||||
|
||||
|
||||
sql """drop database if exists ${jdbcPg14Database1};"""
|
||||
sql """drop resource if exists ${jdbcResourcePg14};"""
|
||||
|
||||
sql """drop database if exists ${jdbcPg14Database1};"""
|
||||
sql """create database ${jdbcPg14Database1};"""
|
||||
sql """use ${jdbcPg14Database1};"""
|
||||
sql """drop resource if exists ${jdbcResourcePg14};"""
|
||||
sql """
|
||||
create external resource ${jdbcResourcePg14}
|
||||
properties (
|
||||
"type"="jdbc",
|
||||
"user"="${extPgUser}",
|
||||
"password"="${extPgPassword}",
|
||||
"jdbc_url"="jdbc:postgresql://${extPgHost}:${extPgPort}/ssb?currentSchema=ssb",
|
||||
"driver_url"="https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/postgresql-42.5.0.jar",
|
||||
"jdbc_url"="jdbc:postgresql://${extPgHost}:${extPgPort}/ssb?currentSchema=ssb&useCursorFetch=true",
|
||||
"driver_url"="https://doris-community-bj-1308700295.cos.ap-beijing.myqcloud.com/jdbc_drivers/postgresql-42.5.0.jar",
|
||||
"driver_class"="org.postgresql.Driver"
|
||||
);
|
||||
"""
|
||||
@ -89,7 +89,7 @@ suite("test_external_pg", "p2") {
|
||||
PROPERTIES (
|
||||
"resource" = "${jdbcResourcePg14}",
|
||||
"table" = "customer",
|
||||
"table_type"="mysql"
|
||||
"table_type"="postgresql"
|
||||
);
|
||||
"""
|
||||
|
||||
@ -106,8 +106,8 @@ suite("test_external_pg", "p2") {
|
||||
) ENGINE=JDBC
|
||||
PROPERTIES (
|
||||
"resource" = "${jdbcResourcePg14}",
|
||||
"table" = "customer",
|
||||
"table_type"="mysql"
|
||||
"table" = "supplier",
|
||||
"table_type"="postgresql"
|
||||
);
|
||||
"""
|
||||
|
||||
@ -126,12 +126,5 @@ suite("test_external_pg", "p2") {
|
||||
def res4 = sql """select * from ${pgTableNameCustomer} a join ${pgTableNameSupplier} b on a.c_nation =b.s_nation limit 5;"""
|
||||
logger.info("recoding select: " + res4.toString())
|
||||
|
||||
sql """drop table if exists ${pgTableNameLineOrder}"""
|
||||
sql """drop table if exists ${pgTableNameCustomer}"""
|
||||
sql """drop table if exists ${pgTableNameSupplier}"""
|
||||
|
||||
sql """drop database if exists ${jdbcPg14Database1};"""
|
||||
sql """drop resource if exists ${jdbcResourcePg14};"""
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user