From 4035bd83c366f287ca3283c8034eccb2f9e27f56 Mon Sep 17 00:00:00 2001 From: Mingyu Chen Date: Wed, 18 Jan 2023 17:48:06 +0800 Subject: [PATCH] [fix](jdbc) fix jdbc driver bug and external datasource p2 test case issue (#16033) Fix bug that when create jdbc resource with only jdbc driver file name, it will failed to do checksum This is because we forgot the pass the full driver url to JdbcClient. Add ResultSet.FETCH_FORWARD and set AutoCommit to false to jdbc connection, so to avoid OOM when fetching large amount of data set useCursorFetch in jdbc url for both MySQL and PostgreSQL. Fix some p2 external datasource bug --- .../apache/doris/catalog/JdbcResource.java | 8 ++++--- .../doris/external/jdbc/JdbcClient.java | 13 ++---------- .../org/apache/doris/udf/JdbcExecutor.java | 5 ++++- .../hive/test_external_yandex_nereids.out | 11 +++++++++- .../test_external_catalog_icebergv2.out | 13 ++++++------ .../es/test_external_catalog_es.groovy | 2 +- .../test_external_catalog_icebergv2.groovy | 8 +++---- .../mysql/test_external_catalog_mysql.groovy | 5 ++--- .../mysql/test_external_resource_mysql.groovy | 4 ++-- .../pg/test_external_pg.groovy | 21 +++++++------------ 10 files changed, 44 insertions(+), 46 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index add3009de8..afdd099702 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -189,10 +189,10 @@ public class JdbcResource extends Resource { // skip checking checksum when running ut return ""; } - String fullDriverPath = getRealDriverPath(driverPath); + String fullDriverUrl = getFullDriverUrl(driverPath); InputStream inputStream = null; try { - inputStream = Util.getInputStreamFromUrl(fullDriverPath, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS); + inputStream = Util.getInputStreamFromUrl(fullDriverUrl, null, HTTP_TIMEOUT_MS, HTTP_TIMEOUT_MS); MessageDigest digest = MessageDigest.getInstance("MD5"); byte[] buf = new byte[4096]; int bytesRead = 0; @@ -213,7 +213,7 @@ public class JdbcResource extends Resource { } } - private static String getRealDriverPath(String driverUrl) { + public static String getFullDriverUrl(String driverUrl) { try { URI uri = new URI(driverUrl); String schema = uri.getScheme(); @@ -254,6 +254,8 @@ public class JdbcResource extends Resource { // it will convert to Doris tinyint, not bit. newJdbcUrl = checkJdbcUrlParam(newJdbcUrl, "yearIsDateType", "true", "false"); newJdbcUrl = checkJdbcUrlParam(newJdbcUrl, "tinyInt1isBit", "true", "false"); + } + if (dbType.equals(MYSQL) || dbType.equals(POSTGRESQL)) { newJdbcUrl = checkJdbcUrlParam(newJdbcUrl, "useCursorFetch", "false", "true"); } return newJdbcUrl; diff --git a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java index cc8fc80b3d..6f420246c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/external/jdbc/JdbcClient.java @@ -50,29 +50,20 @@ public class JdbcClient { private String dbType; private String jdbcUser; - private String jdbcPasswd; - private String jdbcUrl; - private String driverUrl; - private String driverClass; private URLClassLoader classLoader = null; private HikariDataSource dataSource = null; - public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass) { this.jdbcUser = user; - this.jdbcPasswd = password; - this.jdbcUrl = jdbcUrl; this.dbType = parseDbType(jdbcUrl); - this.driverUrl = driverUrl; - this.driverClass = driverClass; ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); try { // TODO(ftw): The problem here is that the jar package is handled by FE // and URLClassLoader may load the jar package directly into memory - URL[] urls = {new URL(driverUrl)}; + URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))}; // set parent ClassLoader to null, we can achieve class loading isolation. classLoader = URLClassLoader.newInstance(urls, null); Thread.currentThread().setContextClassLoader(classLoader); @@ -80,7 +71,7 @@ public class JdbcClient { config.setDriverClassName(driverClass); config.setJdbcUrl(jdbcUrl); config.setUsername(jdbcUser); - config.setPassword(jdbcPasswd); + config.setPassword(password); config.setMaximumPoolSize(1); dataSource = new HikariDataSource(config); } catch (MalformedURLException e) { diff --git a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java index f7273e3718..7f2366b948 100644 --- a/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java +++ b/fe/java-udf/src/main/java/org/apache/doris/udf/JdbcExecutor.java @@ -249,8 +249,10 @@ public class JdbcExecutor { dataSource = new HikariDataSource(config); conn = dataSource.getConnection(); if (op == TJdbcOperation.READ) { + conn.setAutoCommit(false); Preconditions.checkArgument(sql != null); - stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, + ResultSet.FETCH_FORWARD); stmt.setFetchSize(batchSize); } else { stmt = conn.createStatement(); @@ -264,3 +266,4 @@ public class JdbcExecutor { } } } + diff --git a/regression-test/data/external_table_emr_p2/hive/test_external_yandex_nereids.out b/regression-test/data/external_table_emr_p2/hive/test_external_yandex_nereids.out index dc56cd16a2..004cdfedd4 100644 --- a/regression-test/data/external_table_emr_p2/hive/test_external_yandex_nereids.out +++ b/regression-test/data/external_table_emr_p2/hive/test_external_yandex_nereids.out @@ -237,7 +237,16 @@ http://minsk/odessages.yandex.ru/vorozhitelnosti/Armanada-yeni-boyiny_i_motot-bi -- !06 -- -10 5604 -0 1853 +-9 603 +-8 236 +-7 133 +-6 123 +-5 105 +5 82 +6 91 +7 102 +8 156 +9 222 10 4291 -- !07 -- diff --git a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out index dd7d5cb2d5..e7158ffd36 100644 --- a/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out +++ b/regression-test/data/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.out @@ -38,9 +38,9 @@ 3 -- !q10 -- -2 -3 -4 +150000000 +149999999 +149999996 -- !q11 -- 1 @@ -48,6 +48,7 @@ 3 -- !q12 -- -2 -3 -4 +150000000 +149999999 +149999996 + diff --git a/regression-test/suites/external_table_emr_p2/es/test_external_catalog_es.groovy b/regression-test/suites/external_table_emr_p2/es/test_external_catalog_es.groovy index 1234d22d51..ed7f853c97 100644 --- a/regression-test/suites/external_table_emr_p2/es/test_external_catalog_es.groovy +++ b/regression-test/suites/external_table_emr_p2/es/test_external_catalog_es.groovy @@ -36,7 +36,7 @@ suite("test_external_catalog_es", "p2") { "elasticsearch.hosts"="http://${extEsHost}:${extEsPort}", "elasticsearch.nodes_discovery"="false", "elasticsearch.username"="${extEsUser}", - "elasticsearch.username"="${extEsPassword}", + "elasticsearch.username"="${extEsPassword}" ); """ diff --git a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy index 765bb789d5..3b70f1936e 100644 --- a/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy +++ b/regression-test/suites/external_table_emr_p2/iceberg/test_external_catalog_icebergv2.groovy @@ -44,10 +44,10 @@ suite("test_external_catalog_icebergv2", "p2") { } // test time travel stmt def q02 = { - qt_q09 """ select c_custkey from customer for time as of '2022-12-27 10:21:36' limit 3 """ - qt_q10 """ select c_custkey from customer for time as of '2022-12-28 10:21:36' limit 3 """ - qt_q11 """ select c_custkey from customer for version as of 906874575350293177 limit 3 """ - qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 limit 3 """ + qt_q09 """ select c_custkey from customer for time as of '2022-12-27 10:21:36' order by c_custkey limit 3 """ + qt_q10 """ select c_custkey from customer for time as of '2022-12-28 10:21:36' order by c_custkey desc limit 3 """ + qt_q11 """ select c_custkey from customer for version as of 906874575350293177 order by c_custkey limit 3 """ + qt_q12 """ select c_custkey from customer for version as of 6352416983354893547 order by c_custkey desc limit 3 """ } sql """ use `tpch_1000_icebergv2`; """ q01() diff --git a/regression-test/suites/external_table_emr_p2/mysql/test_external_catalog_mysql.groovy b/regression-test/suites/external_table_emr_p2/mysql/test_external_catalog_mysql.groovy index a411926d99..b938f8c8ed 100644 --- a/regression-test/suites/external_table_emr_p2/mysql/test_external_catalog_mysql.groovy +++ b/regression-test/suites/external_table_emr_p2/mysql/test_external_catalog_mysql.groovy @@ -35,6 +35,7 @@ suite("test_external_catalog_mysql", "p2") { sql """use ${mysqlDatabaseName01};""" + sql """drop catalog if exists ${mysqlCatalogName};""" sql """drop resource if exists ${mysqlResource01};""" sql """ CREATE RESOURCE ${mysqlResource01} @@ -43,13 +44,11 @@ suite("test_external_catalog_mysql", "p2") { "user"="${extMysqlUser}", "password"="${extMysqlPassword}", "jdbc_url"="jdbc:mysql://${extMysqlHost}:${extMysqlPort}/ssb?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false", - "driver_url"="https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar", + "driver_url"="https://doris-community-bj-1308700295.cos.ap-beijing.myqcloud.com/jdbc_drivers/mysql-connector-java-8.0.25.jar", "driver_class"="com.mysql.cj.jdbc.Driver" ); """ - sql """drop catalog if exists ${mysqlCatalogName};""" - sql """CREATE CATALOG ${mysqlCatalogName} WITH RESOURCE ${mysqlResource01};""" // sql """drop catalog if exists ${mysqlCatalogName};""" diff --git a/regression-test/suites/external_table_emr_p2/mysql/test_external_resource_mysql.groovy b/regression-test/suites/external_table_emr_p2/mysql/test_external_resource_mysql.groovy index 86d432d966..ed0ece15fb 100644 --- a/regression-test/suites/external_table_emr_p2/mysql/test_external_resource_mysql.groovy +++ b/regression-test/suites/external_table_emr_p2/mysql/test_external_resource_mysql.groovy @@ -44,7 +44,7 @@ suite("test_external_resource_mysql", "p2") { "user"="${extMysqlUser}", "password"="${extMysqlPassword}", "jdbc_url"="jdbc:mysql://${extMysqlHost}:${extMysqlPort}/ssb?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true&serverTimezone=Asia/Shanghai&useSSL=false", - "driver_url"="https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar", + "driver_url"="https://doris-community-bj-1308700295.cos.ap-beijing.myqcloud.com/jdbc_drivers/mysql-connector-java-8.0.25.jar", "driver_class"="com.mysql.cj.jdbc.Driver" ); """ @@ -126,7 +126,7 @@ suite("test_external_resource_mysql", "p2") { ) ENGINE=JDBC PROPERTIES ( "resource" = "${mysqlResourceName}", - "table" = "customer", + "table" = "supplier", "table_type"="mysql" ); """ diff --git a/regression-test/suites/external_table_emr_p2/pg/test_external_pg.groovy b/regression-test/suites/external_table_emr_p2/pg/test_external_pg.groovy index e39fb44f76..b165f01805 100644 --- a/regression-test/suites/external_table_emr_p2/pg/test_external_pg.groovy +++ b/regression-test/suites/external_table_emr_p2/pg/test_external_pg.groovy @@ -29,20 +29,20 @@ suite("test_external_pg", "p2") { String pgTableNameCustomer = "jdbc_pg_14_customer" String pgTableNameSupplier = "jdbc_pg_14_supplier" - + sql """drop database if exists ${jdbcPg14Database1};""" + sql """drop resource if exists ${jdbcResourcePg14};""" sql """drop database if exists ${jdbcPg14Database1};""" sql """create database ${jdbcPg14Database1};""" sql """use ${jdbcPg14Database1};""" - sql """drop resource if exists ${jdbcResourcePg14};""" sql """ create external resource ${jdbcResourcePg14} properties ( "type"="jdbc", "user"="${extPgUser}", "password"="${extPgPassword}", - "jdbc_url"="jdbc:postgresql://${extPgHost}:${extPgPort}/ssb?currentSchema=ssb", - "driver_url"="https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/postgresql-42.5.0.jar", + "jdbc_url"="jdbc:postgresql://${extPgHost}:${extPgPort}/ssb?currentSchema=ssb&useCursorFetch=true", + "driver_url"="https://doris-community-bj-1308700295.cos.ap-beijing.myqcloud.com/jdbc_drivers/postgresql-42.5.0.jar", "driver_class"="org.postgresql.Driver" ); """ @@ -89,7 +89,7 @@ suite("test_external_pg", "p2") { PROPERTIES ( "resource" = "${jdbcResourcePg14}", "table" = "customer", - "table_type"="mysql" + "table_type"="postgresql" ); """ @@ -106,8 +106,8 @@ suite("test_external_pg", "p2") { ) ENGINE=JDBC PROPERTIES ( "resource" = "${jdbcResourcePg14}", - "table" = "customer", - "table_type"="mysql" + "table" = "supplier", + "table_type"="postgresql" ); """ @@ -126,12 +126,5 @@ suite("test_external_pg", "p2") { def res4 = sql """select * from ${pgTableNameCustomer} a join ${pgTableNameSupplier} b on a.c_nation =b.s_nation limit 5;""" logger.info("recoding select: " + res4.toString()) - sql """drop table if exists ${pgTableNameLineOrder}""" - sql """drop table if exists ${pgTableNameCustomer}""" - sql """drop table if exists ${pgTableNameSupplier}""" - - sql """drop database if exists ${jdbcPg14Database1};""" - sql """drop resource if exists ${jdbcResourcePg14};""" - } }