From 8fc9c18c850a94a02062b1848becbdc6ca9eb888 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Mon, 8 Jan 2024 18:21:57 +0800 Subject: [PATCH] [improvement](jdbc catalog) Put the jdbc connection pool parameters into catalog properties (#29195) --- be/src/runtime/descriptors.cpp | 14 +- be/src/runtime/descriptors.h | 10 ++ be/src/vec/exec/scan/new_jdbc_scanner.cpp | 5 + be/src/vec/exec/vjdbc_connector.cpp | 5 + be/src/vec/exec/vjdbc_connector.h | 5 + be/src/vec/sink/writer/vjdbc_table_writer.cpp | 5 + bin/start_be.sh | 5 +- conf/be.conf | 4 +- .../org/apache/doris/jdbc/JdbcDataSource.java | 5 - .../doris/jdbc/JdbcDataSourceConfig.java | 169 ++++++++++++++++++ .../org/apache/doris/jdbc/JdbcExecutor.java | 93 +++++----- .../apache/doris/catalog/JdbcResource.java | 17 +- .../org/apache/doris/catalog/JdbcTable.java | 31 ++++ .../catalog/external/JdbcExternalTable.java | 5 + .../datasource/jdbc/JdbcExternalCatalog.java | 28 ++- .../datasource/jdbc/client/JdbcClient.java | 37 ++-- .../jdbc/client/JdbcClientConfig.java | 61 +++++++ .../planner/external/jdbc/JdbcTableSink.java | 28 +-- gensrc/thrift/Descriptors.thrift | 6 +- gensrc/thrift/Types.thrift | 6 + 20 files changed, 440 insertions(+), 99 deletions(-) create mode 100644 fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java diff --git a/be/src/runtime/descriptors.cpp b/be/src/runtime/descriptors.cpp index 09113f85ee..1f70c8e281 100644 --- a/be/src/runtime/descriptors.cpp +++ b/be/src/runtime/descriptors.cpp @@ -265,17 +265,25 @@ JdbcTableDescriptor::JdbcTableDescriptor(const TTableDescriptor& tdesc) _jdbc_url(tdesc.jdbcTable.jdbc_url), _jdbc_table_name(tdesc.jdbcTable.jdbc_table_name), _jdbc_user(tdesc.jdbcTable.jdbc_user), - _jdbc_passwd(tdesc.jdbcTable.jdbc_password) {} + _jdbc_passwd(tdesc.jdbcTable.jdbc_password), + _jdbc_min_pool_size(tdesc.jdbcTable.jdbc_min_pool_size), + _jdbc_max_pool_size(tdesc.jdbcTable.jdbc_max_pool_size), + _jdbc_max_idle_time(tdesc.jdbcTable.jdbc_max_idle_time), + _jdbc_max_wait_time(tdesc.jdbcTable.jdbc_max_wait_time), + _jdbc_keep_alive(tdesc.jdbcTable.jdbc_keep_alive) {} std::string JdbcTableDescriptor::debug_string() const { fmt::memory_buffer buf; fmt::format_to(buf, "JDBCTable({} ,_jdbc_resource_name={} ,_jdbc_driver_url={} " ",_jdbc_driver_class={} ,_jdbc_driver_checksum={} ,_jdbc_url={} " - ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={})", + ",_jdbc_table_name={} ,_jdbc_user={} ,_jdbc_passwd={} ,_jdbc_min_pool_size={} " + ",_jdbc_max_pool_size={} ,_jdbc_max_idle_time={} ,_jdbc_max_wait_time={} " + ",_jdbc_keep_alive={})", TableDescriptor::debug_string(), _jdbc_resource_name, _jdbc_driver_url, _jdbc_driver_class, _jdbc_driver_checksum, _jdbc_url, _jdbc_table_name, - _jdbc_user, _jdbc_passwd); + _jdbc_user, _jdbc_passwd, _jdbc_min_pool_size, _jdbc_max_pool_size, + _jdbc_max_idle_time, _jdbc_max_wait_time, _jdbc_keep_alive); return fmt::to_string(buf); } diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index ca43bce833..42fa67a0f8 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -318,6 +318,11 @@ public: const std::string& jdbc_table_name() const { return _jdbc_table_name; } const std::string& jdbc_user() const { return _jdbc_user; } const std::string& jdbc_passwd() const { return _jdbc_passwd; } + int32_t jdbc_min_pool_size() const { return _jdbc_min_pool_size; } + int32_t jdbc_max_pool_size() const { return _jdbc_max_pool_size; } + int32_t jdbc_max_idle_time() const { return _jdbc_max_idle_time; } + int32_t jdbc_max_wait_time() const { return _jdbc_max_wait_time; } + bool jdbc_keep_alive() const { return _jdbc_keep_alive; } private: std::string _jdbc_resource_name; @@ -328,6 +333,11 @@ private: std::string _jdbc_table_name; std::string _jdbc_user; std::string _jdbc_passwd; + int32_t _jdbc_min_pool_size; + int32_t _jdbc_max_pool_size; + int32_t _jdbc_max_idle_time; + int32_t _jdbc_max_wait_time; + bool _jdbc_keep_alive; }; class TupleDescriptor { diff --git a/be/src/vec/exec/scan/new_jdbc_scanner.cpp b/be/src/vec/exec/scan/new_jdbc_scanner.cpp index 4f90edde3b..f403dad6c7 100644 --- a/be/src/vec/exec/scan/new_jdbc_scanner.cpp +++ b/be/src/vec/exec/scan/new_jdbc_scanner.cpp @@ -95,6 +95,11 @@ Status NewJdbcScanner::prepare(RuntimeState* state, const VExprContextSPtrs& con _jdbc_param.tuple_desc = _tuple_desc; _jdbc_param.query_string = std::move(_query_string); _jdbc_param.table_type = _table_type; + _jdbc_param.min_pool_size = jdbc_table->jdbc_min_pool_size(); + _jdbc_param.max_pool_size = jdbc_table->jdbc_max_pool_size(); + _jdbc_param.max_idle_time = jdbc_table->jdbc_max_idle_time(); + _jdbc_param.max_wait_time = jdbc_table->jdbc_max_wait_time(); + _jdbc_param.keep_alive = jdbc_table->jdbc_keep_alive(); if (get_parent() != nullptr) { get_parent()->_scanner_profile->add_info_string("JdbcDriverClass", diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index d3debc688d..56205d2ffa 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -143,6 +143,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_batch_size(read ? state->batch_size() : 0); ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); ctor_params.__set_table_type(_conn_param.table_type); + ctor_params.__set_min_pool_size(_conn_param.min_pool_size); + ctor_params.__set_max_pool_size(_conn_param.max_pool_size); + ctor_params.__set_max_idle_time(_conn_param.max_idle_time); + ctor_params.__set_max_wait_time(_conn_param.max_wait_time); + ctor_params.__set_keep_alive(_conn_param.keep_alive); jbyteArray ctor_params_bytes; // Pushed frame will be popped when jni_frame goes out-of-scope. diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index ba4fb98e0e..5d8ac12132 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -54,6 +54,11 @@ struct JdbcConnectorParam { std::string table_name; bool use_transaction; TOdbcTableType::type table_type; + int32_t min_pool_size; + int32_t max_pool_size; + int32_t max_idle_time; + int32_t max_wait_time; + bool keep_alive; const TupleDescriptor* tuple_desc = nullptr; }; diff --git a/be/src/vec/sink/writer/vjdbc_table_writer.cpp b/be/src/vec/sink/writer/vjdbc_table_writer.cpp index a4805921ef..f7e4941892 100644 --- a/be/src/vec/sink/writer/vjdbc_table_writer.cpp +++ b/be/src/vec/sink/writer/vjdbc_table_writer.cpp @@ -46,6 +46,11 @@ JdbcConnectorParam VJdbcTableWriter::create_connect_param(const doris::TDataSink jdbc_param.query_string = t_jdbc_sink.insert_sql; jdbc_param.table_name = t_jdbc_sink.jdbc_table.jdbc_table_name; jdbc_param.use_transaction = t_jdbc_sink.use_transaction; + jdbc_param.min_pool_size = t_jdbc_sink.jdbc_table.jdbc_min_pool_size; + jdbc_param.max_pool_size = t_jdbc_sink.jdbc_table.jdbc_max_pool_size; + jdbc_param.max_idle_time = t_jdbc_sink.jdbc_table.jdbc_max_idle_time; + jdbc_param.max_wait_time = t_jdbc_sink.jdbc_table.jdbc_max_wait_time; + jdbc_param.keep_alive = t_jdbc_sink.jdbc_table.jdbc_keep_alive; return jdbc_param; } diff --git a/bin/start_be.sh b/bin/start_be.sh index ebf52a05ff..434c06cfe3 100755 --- a/bin/start_be.sh +++ b/bin/start_be.sh @@ -303,16 +303,15 @@ java_version="$( CUR_DATE=$(date +%Y%m%d-%H%M%S) LOG_PATH="-DlogPath=${DORIS_HOME}/log/jni.log" COMMON_OPTS="-Dsun.java.command=DorisBE -XX:-CriticalJNINatives" -JDBC_OPTS="-DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000" if [[ "${java_version}" -gt 8 ]]; then if [[ -z ${JAVA_OPTS_FOR_JDK_9} ]]; then - JAVA_OPTS_FOR_JDK_9="-Xmx1024m ${LOG_PATH} -Xlog:gc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} ${COMMON_OPTS} ${JDBC_OPTS}" + JAVA_OPTS_FOR_JDK_9="-Xmx1024m ${LOG_PATH} -Xlog:gc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} ${COMMON_OPTS}" fi final_java_opt="${JAVA_OPTS_FOR_JDK_9}" else if [[ -z ${JAVA_OPTS} ]]; then - JAVA_OPTS="-Xmx1024m ${LOG_PATH} -Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} ${COMMON_OPTS} ${JDBC_OPTS}" + JAVA_OPTS="-Xmx1024m ${LOG_PATH} -Xloggc:${DORIS_HOME}/log/be.gc.log.${CUR_DATE} ${COMMON_OPTS}" fi final_java_opt="${JAVA_OPTS}" fi diff --git a/conf/be.conf b/conf/be.conf index 3f441ebf79..684830f2aa 100644 --- a/conf/be.conf +++ b/conf/be.conf @@ -19,10 +19,10 @@ CUR_DATE=`date +%Y%m%d-%H%M%S` PPROF_TMPDIR="$DORIS_HOME/log/" -JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000" +JAVA_OPTS="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xloggc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # For jdk 9+, this JAVA_OPTS will be used as default JVM options -JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives -DJDBC_MIN_POOL=1 -DJDBC_MAX_POOL=100 -DJDBC_MAX_IDLE_TIME=300000 -DJDBC_MAX_WAIT_TIME=5000" +JAVA_OPTS_FOR_JDK_9="-Xmx1024m -DlogPath=$DORIS_HOME/log/jni.log -Xlog:gc:$DORIS_HOME/log/be.gc.log.$CUR_DATE -Djavax.security.auth.useSubjectCredsOnly=false -Dsun.security.krb5.debug=true -Dsun.java.command=DorisBE -XX:-CriticalJNINatives" # since 1.2, the JAVA_HOME need to be set to run BE process. # JAVA_HOME=/path/to/jdk/ diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java index 6a6a022d29..2fd0acf436 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSource.java @@ -41,9 +41,4 @@ public class JdbcDataSource { public Map getSourcesMap() { return sourcesMap; } - - public String createCacheKey(String jdbcUrl, String jdbcUser, String jdbcPassword, String jdbcDriverUrl, - String jdbcDriverClass) { - return jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + jdbcDriverClass; - } } diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java new file mode 100644 index 0000000000..be32568bd2 --- /dev/null +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcDataSourceConfig.java @@ -0,0 +1,169 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.jdbc; + +import org.apache.doris.thrift.TJdbcOperation; +import org.apache.doris.thrift.TOdbcTableType; + +public class JdbcDataSourceConfig { + private String jdbcUrl; + private String jdbcUser; + private String jdbcPassword; + private String jdbcDriverUrl; + private String jdbcDriverClass; + private int batchSize; + private TJdbcOperation op; + private TOdbcTableType tableType; + private int minPoolSize; + private int maxPoolSize; + private int minIdleSize; + private int maxIdleTime; + private int maxWaitTime; + private boolean keepAlive; + + public String createCacheKey() { + return jdbcUrl + jdbcUser + jdbcPassword + jdbcDriverUrl + jdbcDriverClass + + minPoolSize + maxPoolSize + minIdleSize + maxIdleTime + maxWaitTime + keepAlive; + } + + public String getJdbcUrl() { + return jdbcUrl; + } + + public JdbcDataSourceConfig setJdbcUrl(String jdbcUrl) { + this.jdbcUrl = jdbcUrl; + return this; + } + + public String getJdbcUser() { + return jdbcUser; + } + + public JdbcDataSourceConfig setJdbcUser(String jdbcUser) { + this.jdbcUser = jdbcUser; + return this; + } + + public String getJdbcPassword() { + return jdbcPassword; + } + + public JdbcDataSourceConfig setJdbcPassword(String jdbcPassword) { + this.jdbcPassword = jdbcPassword; + return this; + } + + public String getJdbcDriverUrl() { + return jdbcDriverUrl; + } + + public JdbcDataSourceConfig setJdbcDriverUrl(String jdbcDriverUrl) { + this.jdbcDriverUrl = jdbcDriverUrl; + return this; + } + + public String getJdbcDriverClass() { + return jdbcDriverClass; + } + + public JdbcDataSourceConfig setJdbcDriverClass(String jdbcDriverClass) { + this.jdbcDriverClass = jdbcDriverClass; + return this; + } + + public int getBatchSize() { + return batchSize; + } + + public JdbcDataSourceConfig setBatchSize(int batchSize) { + this.batchSize = batchSize; + return this; + } + + public TJdbcOperation getOp() { + return op; + } + + public JdbcDataSourceConfig setOp(TJdbcOperation op) { + this.op = op; + return this; + } + + public TOdbcTableType getTableType() { + return tableType; + } + + public JdbcDataSourceConfig setTableType(TOdbcTableType tableType) { + this.tableType = tableType; + return this; + } + + public int getMinPoolSize() { + return minPoolSize; + } + + public JdbcDataSourceConfig setMinPoolSize(int minPoolSize) { + this.minPoolSize = minPoolSize; + return this; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public JdbcDataSourceConfig setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + return this; + } + + public int getMinIdleSize() { + return minIdleSize; + } + + public JdbcDataSourceConfig setMinIdleSize(int minIdleSize) { + this.minIdleSize = minIdleSize; + return this; + } + + public int getMaxIdleTime() { + return maxIdleTime; + } + + public JdbcDataSourceConfig setMaxIdleTime(int maxIdleTime) { + this.maxIdleTime = maxIdleTime; + return this; + } + + public int getMaxWaitTime() { + return maxWaitTime; + } + + public JdbcDataSourceConfig setMaxWaitTime(int maxWaitTime) { + this.maxWaitTime = maxWaitTime; + return this; + } + + public boolean isKeepAlive() { + return keepAlive; + } + + public JdbcDataSourceConfig setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + return this; + } +} diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java index 9f98740aa6..ad34dd0035 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/JdbcExecutor.java @@ -93,12 +93,8 @@ public class JdbcExecutor { private static final byte[] emptyBytes = new byte[0]; private DruidDataSource druidDataSource = null; private byte[] druidDataSourceLock = new byte[0]; - private int minPoolSize; - private int maxPoolSize; - private int minIdleSize; - private int maxIdleTime; - private int maxWaitTime; private TOdbcTableType tableType; + private JdbcDataSourceConfig config; public JdbcExecutor(byte[] thriftParams) throws Exception { TJdbcExecutorCtorParams request = new TJdbcExecutorCtorParams(); @@ -109,18 +105,22 @@ public class JdbcExecutor { throw new InternalException(e.getMessage()); } tableType = request.table_type; - minPoolSize = Integer.valueOf(System.getProperty("JDBC_MIN_POOL", "1")); - maxPoolSize = Integer.valueOf(System.getProperty("JDBC_MAX_POOL", "100")); - maxIdleTime = Integer.valueOf(System.getProperty("JDBC_MAX_IDLE_TIME", "300000")); - maxWaitTime = Integer.valueOf(System.getProperty("JDBC_MAX_WAIT_TIME", "5000")); - minIdleSize = minPoolSize > 0 ? 1 : 0; - LOG.info("JdbcExecutor set minPoolSize = " + minPoolSize - + ", maxPoolSize = " + maxPoolSize - + ", maxIdleTime = " + maxIdleTime - + ", maxWaitTime = " + maxWaitTime - + ", minIdleSize = " + minIdleSize); - init(request.driver_path, request.statement, request.batch_size, request.jdbc_driver_class, - request.jdbc_url, request.jdbc_user, request.jdbc_password, request.op, request.table_type); + this.config = new JdbcDataSourceConfig() + .setJdbcUser(request.jdbc_user) + .setJdbcPassword(request.jdbc_password) + .setJdbcUrl(request.jdbc_url) + .setJdbcDriverUrl(request.driver_path) + .setJdbcDriverClass(request.jdbc_driver_class) + .setBatchSize(request.batch_size) + .setOp(request.op) + .setTableType(request.table_type) + .setMinPoolSize(request.min_pool_size) + .setMaxPoolSize(request.max_pool_size) + .setMaxIdleTime(request.max_idle_time) + .setMaxWaitTime(request.max_wait_time) + .setMinIdleSize(request.min_pool_size > 0 ? 1 : 0) + .setKeepAlive(request.keep_alive); + init(config, request.statement); } public void close() throws Exception { @@ -133,7 +133,7 @@ public class JdbcExecutor { if (conn != null) { conn.close(); } - if (minIdleSize == 0) { + if (config.getMinIdleSize() == 0) { // it can be immediately closed if there is no need to maintain the cache of datasource druidDataSource.close(); JdbcDataSource.getDataSource().getSourcesMap().clear(); @@ -279,19 +279,18 @@ public class JdbcExecutor { } } - private void init(String driverUrl, String sql, int batchSize, String driverClass, String jdbcUrl, String jdbcUser, - String jdbcPassword, TJdbcOperation op, TOdbcTableType tableType) throws UdfRuntimeException { - String druidDataSourceKey = JdbcDataSource.getDataSource().createCacheKey(jdbcUrl, jdbcUser, jdbcPassword, - driverUrl, driverClass); + private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeException { + String druidDataSourceKey = config.createCacheKey(); try { if (isNebula()) { - batchSizeNum = batchSize; - Class.forName(driverClass); - conn = DriverManager.getConnection(jdbcUrl, jdbcUser, jdbcPassword); + batchSizeNum = config.getBatchSize(); + Class.forName(config.getJdbcDriverClass()); + conn = DriverManager.getConnection(config.getJdbcDriverClass(), config.getJdbcUser(), + config.getJdbcPassword()); stmt = conn.prepareStatement(sql); } else { ClassLoader parent = getClass().getClassLoader(); - ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent); + ClassLoader classLoader = UdfUtils.getClassLoader(config.getJdbcDriverUrl(), parent); druidDataSource = JdbcDataSource.getDataSource().getSource(druidDataSourceKey); if (druidDataSource == null) { synchronized (druidDataSourceLock) { @@ -300,24 +299,31 @@ public class JdbcExecutor { long start = System.currentTimeMillis(); DruidDataSource ds = new DruidDataSource(); ds.setDriverClassLoader(classLoader); - ds.setDriverClassName(driverClass); - ds.setUrl(jdbcUrl); - ds.setUsername(jdbcUser); - ds.setPassword(jdbcPassword); - ds.setMinIdle(minIdleSize); - ds.setInitialSize(minPoolSize); - ds.setMaxActive(maxPoolSize); - ds.setMaxWait(maxWaitTime); + ds.setDriverClassName(config.getJdbcDriverClass()); + ds.setUrl(config.getJdbcUrl()); + ds.setUsername(config.getJdbcUser()); + ds.setPassword(config.getJdbcPassword()); + ds.setMinIdle(config.getMinIdleSize()); + ds.setInitialSize(config.getMinPoolSize()); + ds.setMaxActive(config.getMaxPoolSize()); + ds.setMaxWait(config.getMaxWaitTime()); ds.setTestWhileIdle(true); ds.setTestOnBorrow(false); - setValidationQuery(ds, tableType); - ds.setTimeBetweenEvictionRunsMillis(maxIdleTime / 5); - ds.setMinEvictableIdleTimeMillis(maxIdleTime); + setValidationQuery(ds, config.getTableType()); + ds.setTimeBetweenEvictionRunsMillis(config.getMaxIdleTime() / 5); + ds.setMinEvictableIdleTimeMillis(config.getMaxIdleTime()); + ds.setKeepAlive(config.isKeepAlive()); druidDataSource = ds; // and the default datasource init = 1, min = 1, max = 100, if one of connection idle // time greater than 10 minutes. then connection will be retrieved. JdbcDataSource.getDataSource().putSource(druidDataSourceKey, ds); - LOG.info("init datasource [" + (jdbcUrl + jdbcUser) + "] cost: " + ( + LOG.info("JdbcExecutor set minPoolSize = " + config.getMinPoolSize() + + ", maxPoolSize = " + config.getMaxPoolSize() + + ", maxIdleTime = " + config.getMaxIdleTime() + + ", maxWaitTime = " + config.getMaxWaitTime() + + ", minIdleSize = " + config.getMinIdleSize() + + ", keepAlive = " + config.isKeepAlive()); + LOG.info("init datasource [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( System.currentTimeMillis() - start) + " ms"); } } @@ -325,25 +331,26 @@ public class JdbcExecutor { long start = System.currentTimeMillis(); conn = druidDataSource.getConnection(); - LOG.info("get connection [" + (jdbcUrl + jdbcUser) + "] cost: " + (System.currentTimeMillis() - start) + LOG.info("get connection [" + (config.getJdbcUrl() + config.getJdbcUser()) + "] cost: " + ( + System.currentTimeMillis() - start) + " ms"); - if (op == TJdbcOperation.READ) { + if (config.getOp() == TJdbcOperation.READ) { conn.setAutoCommit(false); Preconditions.checkArgument(sql != null); stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); if (tableType == TOdbcTableType.MYSQL) { stmt.setFetchSize(Integer.MIN_VALUE); } else { - stmt.setFetchSize(batchSize); + stmt.setFetchSize(config.getBatchSize()); } - batchSizeNum = batchSize; + batchSizeNum = config.getBatchSize(); } else { LOG.info("insert sql: " + sql); preparedStatement = conn.prepareStatement(sql); } } } catch (MalformedURLException e) { - throw new UdfRuntimeException("MalformedURLException to load class about " + driverUrl, e); + throw new UdfRuntimeException("MalformedURLException to load class about " + config.getJdbcDriverUrl(), e); } catch (SQLException e) { throw new UdfRuntimeException("Initialize datasource failed: ", e); } catch (FileNotFoundException e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index da4f90d2a7..ce0805bfb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -94,6 +94,11 @@ public class JdbcResource extends Resource { public static final String TYPE = "type"; public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database"; public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names"; + public static final String MIN_POOL_SIZE = "min_pool_size"; + public static final String MAX_POOL_SIZE = "max_pool_size"; + public static final String MAX_IDLE_TIME = "max_idle_time"; + public static final String MAX_WAIT_TIME = "max_wait_time"; + public static final String KEEP_ALIVE = "keep_alive"; public static final String CHECK_SUM = "checksum"; private static final ImmutableList ALL_PROPERTIES = new ImmutableList.Builder().add( JDBC_URL, @@ -111,7 +116,12 @@ public class JdbcResource extends Resource { ONLY_SPECIFIED_DATABASE, LOWER_CASE_TABLE_NAMES, INCLUDE_DATABASE_LIST, - EXCLUDE_DATABASE_LIST + EXCLUDE_DATABASE_LIST, + MIN_POOL_SIZE, + MAX_POOL_SIZE, + MAX_IDLE_TIME, + MAX_WAIT_TIME, + KEEP_ALIVE ).build(); // The default value of optional properties @@ -123,6 +133,11 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, ""); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, ""); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MIN_POOL_SIZE, "1"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_POOL_SIZE, "100"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_IDLE_TIME, "30000"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_WAIT_TIME, "5000"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(KEEP_ALIVE, "false"); } // timeout for both connection and read. 10 seconds is long enough. diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java index 6dfd7ffc68..06d42b158f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcTable.java @@ -81,6 +81,12 @@ public class JdbcTable extends Table { private String driverUrl; private String checkSum; + private int minPoolSize = 1; + private int maxPoolSize = 100; + private int maxIdleTime = 30000; + private int maxWaitTime = 5000; + private boolean keepAlive = false; + static { Map tempMap = new CaseInsensitiveMap(); tempMap.put("nebula", TOdbcTableType.NEBULA); @@ -163,6 +169,26 @@ public class JdbcTable extends Table { return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_URL, driverUrl); } + public int getMinPoolSize() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MIN_POOL_SIZE, String.valueOf(minPoolSize))); + } + + public int getMaxPoolSize() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_POOL_SIZE, String.valueOf(maxPoolSize))); + } + + public int getMaxIdleTime() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_IDLE_TIME, String.valueOf(maxIdleTime))); + } + + public int getMaxWaitTime() { + return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_WAIT_TIME, String.valueOf(maxWaitTime))); + } + + public boolean getKeepAlive() { + return Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.KEEP_ALIVE, String.valueOf(keepAlive))); + } + private String getFromJdbcResourceOrDefault(String key, String defaultVal) { if (Strings.isNullOrEmpty(resourceName)) { return defaultVal; @@ -185,6 +211,11 @@ public class JdbcTable extends Table { tJdbcTable.setJdbcDriverUrl(getDriverUrl()); tJdbcTable.setJdbcResourceName(resourceName); tJdbcTable.setJdbcDriverChecksum(checkSum); + tJdbcTable.setJdbcMinPoolSize(getMinPoolSize()); + tJdbcTable.setJdbcMaxPoolSize(getMaxPoolSize()); + tJdbcTable.setJdbcMaxIdleTime(getMaxIdleTime()); + tJdbcTable.setJdbcMaxWaitTime(getMaxWaitTime()); + tJdbcTable.setJdbcKeepAlive(getKeepAlive()); TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0, getName(), ""); tTableDescriptor.setJdbcTable(tJdbcTable); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java index 5a423ed027..7d584477f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/external/JdbcExternalTable.java @@ -101,6 +101,11 @@ public class JdbcExternalTable extends ExternalTable { jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl()); jdbcTable.setResourceName(jdbcCatalog.getResource()); jdbcTable.setCheckSum(jdbcCatalog.getCheckSum()); + jdbcTable.setMinPoolSize(jdbcCatalog.getMinPoolSize()); + jdbcTable.setMaxPoolSize(jdbcCatalog.getMaxPoolSize()); + jdbcTable.setMaxIdleTime(jdbcCatalog.getMaxIdleTime()); + jdbcTable.setMaxWaitTime(jdbcCatalog.getMaxWaitTime()); + jdbcTable.setKeepAlive(jdbcCatalog.getKeepAlive()); return jdbcTable; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index aa0704e964..c5b33f7200 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -143,6 +143,26 @@ public class JdbcExternalCatalog extends ExternalCatalog { return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false"); } + public int getMinPoolSize() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MIN_POOL_SIZE, "1")); + } + + public int getMaxPoolSize() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_POOL_SIZE, "100")); + } + + public int getMaxIdleTime() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_IDLE_TIME, "300000")); + } + + public int getMaxWaitTime() { + return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_WAIT_TIME, "5000")); + } + + public boolean getKeepAlive() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.KEEP_ALIVE, "false")); + } + @Override protected void initLocalObjectsImpl() { JdbcClientConfig jdbcClientConfig = new JdbcClientConfig() @@ -155,7 +175,13 @@ public class JdbcExternalCatalog extends ExternalCatalog { .setOnlySpecifiedDatabase(getOnlySpecifiedDatabase()) .setIsLowerCaseTableNames(getLowerCaseTableNames()) .setIncludeDatabaseMap(getIncludeDatabaseMap()) - .setExcludeDatabaseMap(getExcludeDatabaseMap()); + .setExcludeDatabaseMap(getExcludeDatabaseMap()) + .setMinPoolSize(getMinPoolSize()) + .setMaxPoolSize(getMaxPoolSize()) + .setMinIdleSize(getMinPoolSize() > 0 ? 1 : 0) + .setMaxIdleTime(getMaxIdleTime()) + .setMaxWaitTime(getMaxWaitTime()) + .setKeepAlive(getKeepAlive()); jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index d53b720829..3e1f5a73f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -60,7 +60,6 @@ public abstract class JdbcClient { protected DruidDataSource dataSource = null; protected boolean isOnlySpecifiedDatabase; protected boolean isLowerCaseTableNames; - protected String oceanbaseMode = ""; protected Map includeDatabaseMap; protected Map excludeDatabaseMap; @@ -114,17 +113,16 @@ public abstract class JdbcClient { Optional.ofNullable(jdbcClientConfig.getExcludeDatabaseMap()).orElse(Collections.emptyMap()); String jdbcUrl = jdbcClientConfig.getJdbcUrl(); this.dbType = parseDbType(jdbcUrl); - initializeDataSource(jdbcClientConfig.getPassword(), jdbcUrl, jdbcClientConfig.getDriverUrl(), - jdbcClientConfig.getDriverClass()); + initializeDataSource(jdbcClientConfig); } // Initialize DruidDataSource - private void initializeDataSource(String password, String jdbcUrl, String driverUrl, String driverClass) { + private void initializeDataSource(JdbcClientConfig config) { ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader(); try { // TODO(ftw): The problem here is that the jar package is handled by FE // and URLClassLoader may load the jar package directly into memory - URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))}; + URL[] urls = {new URL(JdbcResource.getFullDriverUrl(config.getDriverUrl()))}; // set parent ClassLoader to null, we can achieve class loading isolation. ClassLoader parent = getClass().getClassLoader(); ClassLoader classLoader = URLClassLoader.newInstance(urls, parent); @@ -133,23 +131,30 @@ public abstract class JdbcClient { Thread.currentThread().setContextClassLoader(classLoader); dataSource = new DruidDataSource(); dataSource.setDriverClassLoader(classLoader); - dataSource.setDriverClassName(driverClass); - dataSource.setUrl(jdbcUrl); - dataSource.setUsername(jdbcUser); - dataSource.setPassword(password); - dataSource.setMinIdle(1); - dataSource.setInitialSize(1); - dataSource.setMaxActive(100); - dataSource.setTimeBetweenEvictionRunsMillis(600000); - dataSource.setMinEvictableIdleTimeMillis(300000); + dataSource.setDriverClassName(config.getDriverClass()); + dataSource.setUrl(config.getJdbcUrl()); + dataSource.setUsername(config.getUser()); + dataSource.setPassword(config.getPassword()); + dataSource.setMinIdle(config.getMinIdleSize()); + dataSource.setInitialSize(config.getMinPoolSize()); + dataSource.setMaxActive(config.getMaxPoolSize()); + dataSource.setTimeBetweenEvictionRunsMillis(config.getMaxIdleTime() * 2L); + dataSource.setMinEvictableIdleTimeMillis(config.getMaxIdleTime()); // set connection timeout to 5s. // The default is 30s, which is too long. // Because when querying information_schema db, BE will call thrift rpc(default timeout is 30s) // to FE to get schema info, and may create connection here, if we set it too long and the url is invalid, // it may cause the thrift rpc timeout. - dataSource.setMaxWait(5000); + dataSource.setMaxWait(config.getMaxWaitTime()); + dataSource.setKeepAlive(config.isKeepAlive()); + LOG.info("JdbcExecutor set minPoolSize = " + config.getMinPoolSize() + + ", maxPoolSize = " + config.getMaxPoolSize() + + ", maxIdleTime = " + config.getMaxIdleTime() + + ", maxWaitTime = " + config.getMaxWaitTime() + + ", minIdleSize = " + config.getMinIdleSize() + + ", keepAlive = " + config.isKeepAlive()); } catch (MalformedURLException e) { - throw new JdbcClientException("MalformedURLException to load class about " + driverUrl, e); + throw new JdbcClientException("MalformedURLException to load class about " + config.getDriverUrl(), e); } finally { Thread.currentThread().setContextClassLoader(oldClassLoader); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java index 77beef9459..9ae38ba4d5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClientConfig.java @@ -31,6 +31,13 @@ public class JdbcClientConfig implements Cloneable { private String driverClass; private String onlySpecifiedDatabase; private String isLowerCaseTableNames; + private int minPoolSize; + private int maxPoolSize; + private int minIdleSize; + private int maxIdleTime; + private int maxWaitTime; + private boolean keepAlive; + private Map includeDatabaseMap = Maps.newHashMap(); private Map excludeDatabaseMap = Maps.newHashMap(); private Map customizedProperties = Maps.newHashMap(); @@ -121,6 +128,60 @@ public class JdbcClientConfig implements Cloneable { return this; } + public int getMinPoolSize() { + return minPoolSize; + } + + public JdbcClientConfig setMinPoolSize(int minPoolSize) { + this.minPoolSize = minPoolSize; + return this; + } + + public int getMaxPoolSize() { + return maxPoolSize; + } + + public JdbcClientConfig setMaxPoolSize(int maxPoolSize) { + this.maxPoolSize = maxPoolSize; + return this; + } + + public int getMinIdleSize() { + return minIdleSize; + } + + public JdbcClientConfig setMinIdleSize(int minIdleSize) { + this.minIdleSize = minIdleSize; + return this; + } + + public int getMaxIdleTime() { + return maxIdleTime; + } + + public JdbcClientConfig setMaxIdleTime(int maxIdleTime) { + this.maxIdleTime = maxIdleTime; + return this; + } + + public int getMaxWaitTime() { + return maxWaitTime; + } + + public JdbcClientConfig setMaxWaitTime(int maxWaitTime) { + this.maxWaitTime = maxWaitTime; + return this; + } + + public boolean isKeepAlive() { + return keepAlive; + } + + public JdbcClientConfig setKeepAlive(boolean keepAlive) { + this.keepAlive = keepAlive; + return this; + } + public Map getIncludeDatabaseMap() { return includeDatabaseMap; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java index 1f85ac3fef..f55821953e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/jdbc/JdbcTableSink.java @@ -37,30 +37,19 @@ import java.util.List; public class JdbcTableSink extends DataSink { private static final Logger LOG = LogManager.getLogger(JdbcTableSink.class); - private final String resourceName; private final String externalTableName; private final String dorisTableName; - private final String jdbcUrl; - private final String jdbcUser; - private final String jdbcPasswd; - private final String driverClass; - private final String driverUrl; - private final String checkSum; private final TOdbcTableType jdbcType; private final boolean useTransaction; private String insertSql; + private JdbcTable jdbcTable; + public JdbcTableSink(JdbcTable jdbcTable, List insertCols) { - resourceName = jdbcTable.getResourceName(); + this.jdbcTable = jdbcTable; jdbcType = jdbcTable.getJdbcTableType(); externalTableName = jdbcTable.getProperRealFullTableName(jdbcType); useTransaction = ConnectContext.get().getSessionVariable().isEnableOdbcTransaction(); - jdbcUrl = jdbcTable.getJdbcUrl(); - jdbcUser = jdbcTable.getJdbcUser(); - jdbcPasswd = jdbcTable.getJdbcPasswd(); - driverClass = jdbcTable.getDriverClass(); - driverUrl = jdbcTable.getDriverUrl(); - checkSum = jdbcTable.getCheckSum(); dorisTableName = jdbcTable.getName(); insertSql = jdbcTable.getInsertSql(insertCols); } @@ -81,20 +70,11 @@ public class JdbcTableSink extends DataSink { protected TDataSink toThrift() { TDataSink tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK); TJdbcTableSink jdbcTableSink = new TJdbcTableSink(); - TJdbcTable jdbcTable = new TJdbcTable(); + TJdbcTable jdbcTable = this.jdbcTable.toThrift().getJdbcTable(); jdbcTableSink.setJdbcTable(jdbcTable); - jdbcTableSink.jdbc_table.setJdbcUrl(jdbcUrl); - jdbcTableSink.jdbc_table.setJdbcUser(jdbcUser); - jdbcTableSink.jdbc_table.setJdbcPassword(jdbcPasswd); - jdbcTableSink.jdbc_table.setJdbcTableName(externalTableName); - jdbcTableSink.jdbc_table.setJdbcDriverUrl(driverUrl); - jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass); - jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum); - jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName); jdbcTableSink.setInsertSql(insertSql); jdbcTableSink.setUseTransaction(useTransaction); jdbcTableSink.setTableType(jdbcType); - tDataSink.setJdbcTableSink(jdbcTableSink); return tDataSink; } diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 2e3831903c..16a37d1393 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -324,7 +324,11 @@ struct TJdbcTable { 6: optional string jdbc_resource_name 7: optional string jdbc_driver_class 8: optional string jdbc_driver_checksum - + 9: optional i32 jdbc_min_pool_size + 10: optional i32 jdbc_max_pool_size + 11: optional i32 jdbc_max_idle_time + 12: optional i32 jdbc_max_wait_time + 13: optional bool jdbc_keep_alive } struct TMCTable { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 2d0f380dbc..df9bac013a 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -424,6 +424,12 @@ struct TJdbcExecutorCtorParams { 8: optional string driver_path 9: optional TOdbcTableType table_type + + 10: optional i32 min_pool_size + 11: optional i32 max_pool_size + 12: optional i32 max_idle_time + 13: optional i32 max_wait_time + 14: optional bool keep_alive } struct TJavaUdfExecutorCtorParams {