[improvement](jdbc catalog) Put the jdbc connection pool parameters into catalog properties (#29195)
This commit is contained in:
@ -94,6 +94,11 @@ public class JdbcResource extends Resource {
|
||||
public static final String TYPE = "type";
|
||||
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
|
||||
public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names";
|
||||
public static final String MIN_POOL_SIZE = "min_pool_size";
|
||||
public static final String MAX_POOL_SIZE = "max_pool_size";
|
||||
public static final String MAX_IDLE_TIME = "max_idle_time";
|
||||
public static final String MAX_WAIT_TIME = "max_wait_time";
|
||||
public static final String KEEP_ALIVE = "keep_alive";
|
||||
public static final String CHECK_SUM = "checksum";
|
||||
private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
|
||||
JDBC_URL,
|
||||
@ -111,7 +116,12 @@ public class JdbcResource extends Resource {
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
INCLUDE_DATABASE_LIST,
|
||||
EXCLUDE_DATABASE_LIST
|
||||
EXCLUDE_DATABASE_LIST,
|
||||
MIN_POOL_SIZE,
|
||||
MAX_POOL_SIZE,
|
||||
MAX_IDLE_TIME,
|
||||
MAX_WAIT_TIME,
|
||||
KEEP_ALIVE
|
||||
).build();
|
||||
|
||||
// The default value of optional properties
|
||||
@ -123,6 +133,11 @@ public class JdbcResource extends Resource {
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MIN_POOL_SIZE, "1");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_POOL_SIZE, "100");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_IDLE_TIME, "30000");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(MAX_WAIT_TIME, "5000");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(KEEP_ALIVE, "false");
|
||||
}
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
|
||||
@ -81,6 +81,12 @@ public class JdbcTable extends Table {
|
||||
private String driverUrl;
|
||||
private String checkSum;
|
||||
|
||||
private int minPoolSize = 1;
|
||||
private int maxPoolSize = 100;
|
||||
private int maxIdleTime = 30000;
|
||||
private int maxWaitTime = 5000;
|
||||
private boolean keepAlive = false;
|
||||
|
||||
static {
|
||||
Map<String, TOdbcTableType> tempMap = new CaseInsensitiveMap();
|
||||
tempMap.put("nebula", TOdbcTableType.NEBULA);
|
||||
@ -163,6 +169,26 @@ public class JdbcTable extends Table {
|
||||
return getFromJdbcResourceOrDefault(JdbcResource.DRIVER_URL, driverUrl);
|
||||
}
|
||||
|
||||
public int getMinPoolSize() {
|
||||
return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MIN_POOL_SIZE, String.valueOf(minPoolSize)));
|
||||
}
|
||||
|
||||
public int getMaxPoolSize() {
|
||||
return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_POOL_SIZE, String.valueOf(maxPoolSize)));
|
||||
}
|
||||
|
||||
public int getMaxIdleTime() {
|
||||
return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_IDLE_TIME, String.valueOf(maxIdleTime)));
|
||||
}
|
||||
|
||||
public int getMaxWaitTime() {
|
||||
return Integer.parseInt(getFromJdbcResourceOrDefault(JdbcResource.MAX_WAIT_TIME, String.valueOf(maxWaitTime)));
|
||||
}
|
||||
|
||||
public boolean getKeepAlive() {
|
||||
return Boolean.parseBoolean(getFromJdbcResourceOrDefault(JdbcResource.KEEP_ALIVE, String.valueOf(keepAlive)));
|
||||
}
|
||||
|
||||
private String getFromJdbcResourceOrDefault(String key, String defaultVal) {
|
||||
if (Strings.isNullOrEmpty(resourceName)) {
|
||||
return defaultVal;
|
||||
@ -185,6 +211,11 @@ public class JdbcTable extends Table {
|
||||
tJdbcTable.setJdbcDriverUrl(getDriverUrl());
|
||||
tJdbcTable.setJdbcResourceName(resourceName);
|
||||
tJdbcTable.setJdbcDriverChecksum(checkSum);
|
||||
tJdbcTable.setJdbcMinPoolSize(getMinPoolSize());
|
||||
tJdbcTable.setJdbcMaxPoolSize(getMaxPoolSize());
|
||||
tJdbcTable.setJdbcMaxIdleTime(getMaxIdleTime());
|
||||
tJdbcTable.setJdbcMaxWaitTime(getMaxWaitTime());
|
||||
tJdbcTable.setJdbcKeepAlive(getKeepAlive());
|
||||
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.JDBC_TABLE, fullSchema.size(), 0,
|
||||
getName(), "");
|
||||
tTableDescriptor.setJdbcTable(tJdbcTable);
|
||||
|
||||
@ -101,6 +101,11 @@ public class JdbcExternalTable extends ExternalTable {
|
||||
jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
|
||||
jdbcTable.setResourceName(jdbcCatalog.getResource());
|
||||
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
|
||||
jdbcTable.setMinPoolSize(jdbcCatalog.getMinPoolSize());
|
||||
jdbcTable.setMaxPoolSize(jdbcCatalog.getMaxPoolSize());
|
||||
jdbcTable.setMaxIdleTime(jdbcCatalog.getMaxIdleTime());
|
||||
jdbcTable.setMaxWaitTime(jdbcCatalog.getMaxWaitTime());
|
||||
jdbcTable.setKeepAlive(jdbcCatalog.getKeepAlive());
|
||||
return jdbcTable;
|
||||
}
|
||||
|
||||
|
||||
@ -143,6 +143,26 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
|
||||
}
|
||||
|
||||
public int getMinPoolSize() {
|
||||
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MIN_POOL_SIZE, "1"));
|
||||
}
|
||||
|
||||
public int getMaxPoolSize() {
|
||||
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_POOL_SIZE, "100"));
|
||||
}
|
||||
|
||||
public int getMaxIdleTime() {
|
||||
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_IDLE_TIME, "300000"));
|
||||
}
|
||||
|
||||
public int getMaxWaitTime() {
|
||||
return Integer.parseInt(catalogProperty.getOrDefault(JdbcResource.MAX_WAIT_TIME, "5000"));
|
||||
}
|
||||
|
||||
public boolean getKeepAlive() {
|
||||
return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.KEEP_ALIVE, "false"));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
JdbcClientConfig jdbcClientConfig = new JdbcClientConfig()
|
||||
@ -155,7 +175,13 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
|
||||
.setIsLowerCaseTableNames(getLowerCaseTableNames())
|
||||
.setIncludeDatabaseMap(getIncludeDatabaseMap())
|
||||
.setExcludeDatabaseMap(getExcludeDatabaseMap());
|
||||
.setExcludeDatabaseMap(getExcludeDatabaseMap())
|
||||
.setMinPoolSize(getMinPoolSize())
|
||||
.setMaxPoolSize(getMaxPoolSize())
|
||||
.setMinIdleSize(getMinPoolSize() > 0 ? 1 : 0)
|
||||
.setMaxIdleTime(getMaxIdleTime())
|
||||
.setMaxWaitTime(getMaxWaitTime())
|
||||
.setKeepAlive(getKeepAlive());
|
||||
|
||||
jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
|
||||
}
|
||||
|
||||
@ -60,7 +60,6 @@ public abstract class JdbcClient {
|
||||
protected DruidDataSource dataSource = null;
|
||||
protected boolean isOnlySpecifiedDatabase;
|
||||
protected boolean isLowerCaseTableNames;
|
||||
protected String oceanbaseMode = "";
|
||||
|
||||
protected Map<String, Boolean> includeDatabaseMap;
|
||||
protected Map<String, Boolean> excludeDatabaseMap;
|
||||
@ -114,17 +113,16 @@ public abstract class JdbcClient {
|
||||
Optional.ofNullable(jdbcClientConfig.getExcludeDatabaseMap()).orElse(Collections.emptyMap());
|
||||
String jdbcUrl = jdbcClientConfig.getJdbcUrl();
|
||||
this.dbType = parseDbType(jdbcUrl);
|
||||
initializeDataSource(jdbcClientConfig.getPassword(), jdbcUrl, jdbcClientConfig.getDriverUrl(),
|
||||
jdbcClientConfig.getDriverClass());
|
||||
initializeDataSource(jdbcClientConfig);
|
||||
}
|
||||
|
||||
// Initialize DruidDataSource
|
||||
private void initializeDataSource(String password, String jdbcUrl, String driverUrl, String driverClass) {
|
||||
private void initializeDataSource(JdbcClientConfig config) {
|
||||
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
// TODO(ftw): The problem here is that the jar package is handled by FE
|
||||
// and URLClassLoader may load the jar package directly into memory
|
||||
URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))};
|
||||
URL[] urls = {new URL(JdbcResource.getFullDriverUrl(config.getDriverUrl()))};
|
||||
// set parent ClassLoader to null, we can achieve class loading isolation.
|
||||
ClassLoader parent = getClass().getClassLoader();
|
||||
ClassLoader classLoader = URLClassLoader.newInstance(urls, parent);
|
||||
@ -133,23 +131,30 @@ public abstract class JdbcClient {
|
||||
Thread.currentThread().setContextClassLoader(classLoader);
|
||||
dataSource = new DruidDataSource();
|
||||
dataSource.setDriverClassLoader(classLoader);
|
||||
dataSource.setDriverClassName(driverClass);
|
||||
dataSource.setUrl(jdbcUrl);
|
||||
dataSource.setUsername(jdbcUser);
|
||||
dataSource.setPassword(password);
|
||||
dataSource.setMinIdle(1);
|
||||
dataSource.setInitialSize(1);
|
||||
dataSource.setMaxActive(100);
|
||||
dataSource.setTimeBetweenEvictionRunsMillis(600000);
|
||||
dataSource.setMinEvictableIdleTimeMillis(300000);
|
||||
dataSource.setDriverClassName(config.getDriverClass());
|
||||
dataSource.setUrl(config.getJdbcUrl());
|
||||
dataSource.setUsername(config.getUser());
|
||||
dataSource.setPassword(config.getPassword());
|
||||
dataSource.setMinIdle(config.getMinIdleSize());
|
||||
dataSource.setInitialSize(config.getMinPoolSize());
|
||||
dataSource.setMaxActive(config.getMaxPoolSize());
|
||||
dataSource.setTimeBetweenEvictionRunsMillis(config.getMaxIdleTime() * 2L);
|
||||
dataSource.setMinEvictableIdleTimeMillis(config.getMaxIdleTime());
|
||||
// set connection timeout to 5s.
|
||||
// The default is 30s, which is too long.
|
||||
// Because when querying information_schema db, BE will call thrift rpc(default timeout is 30s)
|
||||
// to FE to get schema info, and may create connection here, if we set it too long and the url is invalid,
|
||||
// it may cause the thrift rpc timeout.
|
||||
dataSource.setMaxWait(5000);
|
||||
dataSource.setMaxWait(config.getMaxWaitTime());
|
||||
dataSource.setKeepAlive(config.isKeepAlive());
|
||||
LOG.info("JdbcExecutor set minPoolSize = " + config.getMinPoolSize()
|
||||
+ ", maxPoolSize = " + config.getMaxPoolSize()
|
||||
+ ", maxIdleTime = " + config.getMaxIdleTime()
|
||||
+ ", maxWaitTime = " + config.getMaxWaitTime()
|
||||
+ ", minIdleSize = " + config.getMinIdleSize()
|
||||
+ ", keepAlive = " + config.isKeepAlive());
|
||||
} catch (MalformedURLException e) {
|
||||
throw new JdbcClientException("MalformedURLException to load class about " + driverUrl, e);
|
||||
throw new JdbcClientException("MalformedURLException to load class about " + config.getDriverUrl(), e);
|
||||
} finally {
|
||||
Thread.currentThread().setContextClassLoader(oldClassLoader);
|
||||
}
|
||||
|
||||
@ -31,6 +31,13 @@ public class JdbcClientConfig implements Cloneable {
|
||||
private String driverClass;
|
||||
private String onlySpecifiedDatabase;
|
||||
private String isLowerCaseTableNames;
|
||||
private int minPoolSize;
|
||||
private int maxPoolSize;
|
||||
private int minIdleSize;
|
||||
private int maxIdleTime;
|
||||
private int maxWaitTime;
|
||||
private boolean keepAlive;
|
||||
|
||||
private Map<String, Boolean> includeDatabaseMap = Maps.newHashMap();
|
||||
private Map<String, Boolean> excludeDatabaseMap = Maps.newHashMap();
|
||||
private Map<String, String> customizedProperties = Maps.newHashMap();
|
||||
@ -121,6 +128,60 @@ public class JdbcClientConfig implements Cloneable {
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMinPoolSize() {
|
||||
return minPoolSize;
|
||||
}
|
||||
|
||||
public JdbcClientConfig setMinPoolSize(int minPoolSize) {
|
||||
this.minPoolSize = minPoolSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxPoolSize() {
|
||||
return maxPoolSize;
|
||||
}
|
||||
|
||||
public JdbcClientConfig setMaxPoolSize(int maxPoolSize) {
|
||||
this.maxPoolSize = maxPoolSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMinIdleSize() {
|
||||
return minIdleSize;
|
||||
}
|
||||
|
||||
public JdbcClientConfig setMinIdleSize(int minIdleSize) {
|
||||
this.minIdleSize = minIdleSize;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxIdleTime() {
|
||||
return maxIdleTime;
|
||||
}
|
||||
|
||||
public JdbcClientConfig setMaxIdleTime(int maxIdleTime) {
|
||||
this.maxIdleTime = maxIdleTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public int getMaxWaitTime() {
|
||||
return maxWaitTime;
|
||||
}
|
||||
|
||||
public JdbcClientConfig setMaxWaitTime(int maxWaitTime) {
|
||||
this.maxWaitTime = maxWaitTime;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean isKeepAlive() {
|
||||
return keepAlive;
|
||||
}
|
||||
|
||||
public JdbcClientConfig setKeepAlive(boolean keepAlive) {
|
||||
this.keepAlive = keepAlive;
|
||||
return this;
|
||||
}
|
||||
|
||||
public Map<String, Boolean> getIncludeDatabaseMap() {
|
||||
return includeDatabaseMap;
|
||||
}
|
||||
|
||||
@ -37,30 +37,19 @@ import java.util.List;
|
||||
public class JdbcTableSink extends DataSink {
|
||||
private static final Logger LOG = LogManager.getLogger(JdbcTableSink.class);
|
||||
|
||||
private final String resourceName;
|
||||
private final String externalTableName;
|
||||
private final String dorisTableName;
|
||||
private final String jdbcUrl;
|
||||
private final String jdbcUser;
|
||||
private final String jdbcPasswd;
|
||||
private final String driverClass;
|
||||
private final String driverUrl;
|
||||
private final String checkSum;
|
||||
private final TOdbcTableType jdbcType;
|
||||
private final boolean useTransaction;
|
||||
private String insertSql;
|
||||
|
||||
private JdbcTable jdbcTable;
|
||||
|
||||
public JdbcTableSink(JdbcTable jdbcTable, List<String> insertCols) {
|
||||
resourceName = jdbcTable.getResourceName();
|
||||
this.jdbcTable = jdbcTable;
|
||||
jdbcType = jdbcTable.getJdbcTableType();
|
||||
externalTableName = jdbcTable.getProperRealFullTableName(jdbcType);
|
||||
useTransaction = ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
|
||||
jdbcUrl = jdbcTable.getJdbcUrl();
|
||||
jdbcUser = jdbcTable.getJdbcUser();
|
||||
jdbcPasswd = jdbcTable.getJdbcPasswd();
|
||||
driverClass = jdbcTable.getDriverClass();
|
||||
driverUrl = jdbcTable.getDriverUrl();
|
||||
checkSum = jdbcTable.getCheckSum();
|
||||
dorisTableName = jdbcTable.getName();
|
||||
insertSql = jdbcTable.getInsertSql(insertCols);
|
||||
}
|
||||
@ -81,20 +70,11 @@ public class JdbcTableSink extends DataSink {
|
||||
protected TDataSink toThrift() {
|
||||
TDataSink tDataSink = new TDataSink(TDataSinkType.JDBC_TABLE_SINK);
|
||||
TJdbcTableSink jdbcTableSink = new TJdbcTableSink();
|
||||
TJdbcTable jdbcTable = new TJdbcTable();
|
||||
TJdbcTable jdbcTable = this.jdbcTable.toThrift().getJdbcTable();
|
||||
jdbcTableSink.setJdbcTable(jdbcTable);
|
||||
jdbcTableSink.jdbc_table.setJdbcUrl(jdbcUrl);
|
||||
jdbcTableSink.jdbc_table.setJdbcUser(jdbcUser);
|
||||
jdbcTableSink.jdbc_table.setJdbcPassword(jdbcPasswd);
|
||||
jdbcTableSink.jdbc_table.setJdbcTableName(externalTableName);
|
||||
jdbcTableSink.jdbc_table.setJdbcDriverUrl(driverUrl);
|
||||
jdbcTableSink.jdbc_table.setJdbcDriverClass(driverClass);
|
||||
jdbcTableSink.jdbc_table.setJdbcDriverChecksum(checkSum);
|
||||
jdbcTableSink.jdbc_table.setJdbcResourceName(resourceName);
|
||||
jdbcTableSink.setInsertSql(insertSql);
|
||||
jdbcTableSink.setUseTransaction(useTransaction);
|
||||
jdbcTableSink.setTableType(jdbcType);
|
||||
|
||||
tDataSink.setJdbcTableSink(jdbcTableSink);
|
||||
return tDataSink;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user