[enhencement](jdbc catalog) Use Druid instead of HikariCP in JdbcClient (#17395)

This pr does three things:
1. Use Druid instead of HikariCP in JdbcClient
2. when download udf jar, add the name of the jar package after the local file name.
3. refactor some jdbcResource code
This commit is contained in:
Tiewei Fang
2023-03-07 08:51:10 +08:00
committed by GitHub
parent aedbc5fcb1
commit 48c2d806d7
10 changed files with 82 additions and 96 deletions

View File

@ -252,14 +252,15 @@ Status UserFunctionCache::_get_cache_entry(int64_t fid, const std::string& url,
const std::string& checksum,
UserFunctionCacheEntry** output_entry, LibType type) {
UserFunctionCacheEntry* entry = nullptr;
std::string file_name = _get_file_name_from_url(url);
{
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.find(fid);
if (it != _entry_map.end()) {
entry = it->second;
} else {
entry = new UserFunctionCacheEntry(fid, checksum, _make_lib_file(fid, checksum, type),
type);
entry = new UserFunctionCacheEntry(
fid, checksum, _make_lib_file(fid, checksum, type, file_name), type);
entry->ref();
_entry_map.emplace(fid, entry);
}
@ -376,6 +377,17 @@ std::string UserFunctionCache::_get_real_url(const std::string& url) {
return url;
}
std::string UserFunctionCache::_get_file_name_from_url(const std::string& url) const {
std::string file_name;
size_t last_slash_pos = url.find_last_of('/');
if (last_slash_pos != std::string::npos) {
file_name = url.substr(last_slash_pos + 1, url.size());
} else {
file_name = url;
}
return file_name;
}
// entry's lock must be held
Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* entry) {
RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
@ -384,12 +396,12 @@ Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* ent
}
std::string UserFunctionCache::_make_lib_file(int64_t function_id, const std::string& checksum,
LibType type) {
LibType type, const std::string& file_name) {
int shard = function_id % kLibShardNum;
std::stringstream ss;
ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum;
if (type == LibType::JAR) {
ss << ".jar";
ss << '.' << file_name;
} else {
ss << ".so";
}
@ -417,6 +429,7 @@ Status UserFunctionCache::check_jar(int64_t fid, const std::string& url,
const std::string& checksum) {
UserFunctionCacheEntry* entry = nullptr;
Status st = Status::OK();
std::string file_name = _get_file_name_from_url(url);
{
std::lock_guard<std::mutex> l(_cache_lock);
auto it = _entry_map.find(fid);
@ -424,7 +437,8 @@ Status UserFunctionCache::check_jar(int64_t fid, const std::string& url,
entry = it->second;
} else {
entry = new UserFunctionCacheEntry(
fid, checksum, _make_lib_file(fid, checksum, LibType::JAR), LibType::JAR);
fid, checksum, _make_lib_file(fid, checksum, LibType::JAR, file_name),
LibType::JAR);
entry->ref();
_entry_map.emplace(fid, entry);
}

View File

@ -80,10 +80,12 @@ private:
Status _download_lib(const std::string& url, UserFunctionCacheEntry* entry);
Status _load_cache_entry_internal(UserFunctionCacheEntry* entry);
std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type);
std::string _make_lib_file(int64_t function_id, const std::string& checksum, LibType type,
const std::string& file_name);
void _destroy_cache_entry(UserFunctionCacheEntry* entry);
std::string _get_real_url(const std::string& url);
std::string _get_file_name_from_url(const std::string& url) const;
private:
std::string _lib_dir;

View File

@ -140,6 +140,7 @@ Status JdbcConnector::open(RuntimeState* state, bool read) {
std::abs((int64_t)hash_str(_conn_param.resource_name)), _conn_param.driver_path,
_conn_param.driver_checksum, &local_location));
}
VLOG_QUERY << "driver local path = " << local_location;
TJdbcExecutorCtorParams ctor_params;
ctor_params.__set_statement(_sql_str);

View File

@ -795,9 +795,10 @@ under the License.
<version>3.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
</dependency>
<!-- for aliyun dlf -->

View File

@ -26,6 +26,7 @@ import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.util.Util;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
@ -39,7 +40,6 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.Map;
@ -84,13 +84,23 @@ public class JdbcResource extends Resource {
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names";
public static final String CHECK_SUM = "checksum";
private static final List<String> OPTIONAL_PROPERTIES = Lists.newArrayList(
private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
USER,
PASSWORD,
DRIVER_CLASS,
DRIVER_URL,
TYPE,
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES
);
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES
).build();
// The default value of optional properties
// if one optional property is not specified, will use default value
private static final Map<String, String> OPTIONAL_PROPERTIES_DEFAULT_VALUE = Maps.newHashMap();
static {
@ -116,29 +126,12 @@ public class JdbcResource extends Resource {
this.configs = configs;
}
public JdbcResource getCopiedResource() {
return new JdbcResource(name, Maps.newHashMap(configs));
}
private void checkProperties(String propertiesKey) throws DdlException {
// check the properties key
String value = configs.get(propertiesKey);
if (value == null) {
throw new DdlException("JdbcResource Missing " + propertiesKey + " in properties");
}
}
@Override
public void modifyProperties(Map<String, String> properties) throws DdlException {
// modify properties
replaceIfEffectiveValue(this.configs, DRIVER_URL, properties.get(DRIVER_URL));
replaceIfEffectiveValue(this.configs, DRIVER_CLASS, properties.get(DRIVER_CLASS));
replaceIfEffectiveValue(this.configs, JDBC_URL, properties.get(JDBC_URL));
replaceIfEffectiveValue(this.configs, USER, properties.get(USER));
replaceIfEffectiveValue(this.configs, PASSWORD, properties.get(PASSWORD));
replaceIfEffectiveValue(this.configs, TYPE, properties.get(TYPE));
replaceIfEffectiveValue(this.configs, ONLY_SPECIFIED_DATABASE, properties.get(ONLY_SPECIFIED_DATABASE));
replaceIfEffectiveValue(this.configs, LOWER_CASE_TABLE_NAMES, properties.get(LOWER_CASE_TABLE_NAMES));
for (String propertyKey : ALL_PROPERTIES) {
replaceIfEffectiveValue(this.configs, propertyKey, properties.get(propertyKey));
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
super.modifyProperties(properties);
}
@ -147,14 +140,9 @@ public class JdbcResource extends Resource {
public void checkProperties(Map<String, String> properties) throws AnalysisException {
Map<String, String> copiedProperties = Maps.newHashMap(properties);
// check properties
copiedProperties.remove(DRIVER_URL);
copiedProperties.remove(DRIVER_CLASS);
copiedProperties.remove(JDBC_URL);
copiedProperties.remove(USER);
copiedProperties.remove(PASSWORD);
copiedProperties.remove(TYPE);
copiedProperties.remove(ONLY_SPECIFIED_DATABASE);
copiedProperties.remove(LOWER_CASE_TABLE_NAMES);
for (String propertyKey : ALL_PROPERTIES) {
copiedProperties.remove(propertyKey);
}
if (!copiedProperties.isEmpty()) {
throw new AnalysisException("Unknown JDBC catalog resource properties: " + copiedProperties);
}
@ -164,30 +152,19 @@ public class JdbcResource extends Resource {
protected void setProperties(Map<String, String> properties) throws DdlException {
Preconditions.checkState(properties != null);
for (String key : properties.keySet()) {
switch (key) {
case DRIVER_URL:
case JDBC_URL:
case USER:
case PASSWORD:
case TYPE:
case DRIVER_CLASS:
case ONLY_SPECIFIED_DATABASE: // optional argument
case LOWER_CASE_TABLE_NAMES: // optional argument
break;
default:
throw new DdlException("JDBC resource Property of " + key + " is unknown");
if (!ALL_PROPERTIES.contains(key)) {
throw new DdlException("JDBC resource Property of " + key + " is unknown");
}
}
configs = properties;
handleOptionalArguments();
checkProperties(DRIVER_URL);
checkProperties(DRIVER_CLASS);
checkProperties(JDBC_URL);
checkProperties(USER);
checkProperties(PASSWORD);
checkProperties(TYPE);
checkProperties(ONLY_SPECIFIED_DATABASE);
checkProperties(LOWER_CASE_TABLE_NAMES);
// check properties
for (String property : ALL_PROPERTIES) {
String value = configs.get(property);
if (value == null) {
throw new DdlException("JdbcResource Missing " + property + " in properties");
}
}
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL)));
}

View File

@ -76,29 +76,15 @@ public class OdbcTable extends Table {
private static String mysqlProperName(String name) {
// In JdbcExternalTable, the name contains databaseName, like: db.table
// So, we should split db and table, then switch to `db`.`table`.
String[] fields = name.split("\\.");
String result = "";
for (int i = 0; i < fields.length; ++i) {
if (i != 0) {
result += ".";
}
result += ("`" + fields[i] + "`");
}
return result;
List<String> list = Arrays.asList(name.split("\\."));
return list.stream().map(s -> "`" + s + "`").collect(Collectors.joining("."));
}
private static String mssqlProperName(String name) {
// In JdbcExternalTable, the name contains databaseName, like: db.table
// So, we should split db and table, then switch to [db].[table].
String[] fields = name.split("\\.");
String result = "";
for (int i = 0; i < fields.length; ++i) {
if (i != 0) {
result += ".";
}
result += ("[" + fields[i] + "]");
}
return result;
List<String> list = Arrays.asList(name.split("\\."));
return list.stream().map(s -> "[" + s + "]").collect(Collectors.joining("."));
}
private static String psqlProperName(String name) {

View File

@ -88,6 +88,7 @@ public class JdbcExternalTable extends ExternalTable {
jdbcTable.setJdbcPasswd(jdbcCatalog.getJdbcPasswd());
jdbcTable.setDriverClass(jdbcCatalog.getDriverClass());
jdbcTable.setDriverUrl(jdbcCatalog.getDriverUrl());
jdbcTable.setResourceName(jdbcCatalog.getResource());
jdbcTable.setCheckSum(jdbcCatalog.getCheckSum());
return jdbcTable;
}

View File

@ -25,10 +25,9 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Data;
import lombok.Getter;
import org.apache.logging.log4j.LogManager;
@ -56,8 +55,7 @@ public class JdbcClient {
private URLClassLoader classLoader = null;
private HikariDataSource dataSource = null;
private DruidDataSource dataSource = null;
private boolean isOnlySpecifiedDatabase = false;
private boolean isLowerCaseTableNames = false;
@ -81,21 +79,26 @@ public class JdbcClient {
// and URLClassLoader may load the jar package directly into memory
URL[] urls = {new URL(JdbcResource.getFullDriverUrl(driverUrl))};
// set parent ClassLoader to null, we can achieve class loading isolation.
classLoader = URLClassLoader.newInstance(urls, null);
ClassLoader parent = getClass().getClassLoader();
ClassLoader classLoader = URLClassLoader.newInstance(urls, parent);
LOG.debug("parent ClassLoader: {}, old ClassLoader: {}, class Loader: {}.",
parent, oldClassLoader, classLoader);
Thread.currentThread().setContextClassLoader(classLoader);
HikariConfig config = new HikariConfig();
config.setDriverClassName(driverClass);
config.setJdbcUrl(jdbcUrl);
config.setUsername(jdbcUser);
config.setPassword(password);
config.setMaximumPoolSize(1);
dataSource = new DruidDataSource();
dataSource.setDriverClassLoader(classLoader);
dataSource.setDriverClassName(driverClass);
dataSource.setUrl(jdbcUrl);
dataSource.setUsername(jdbcUser);
dataSource.setPassword(password);
dataSource.setMinIdle(1);
dataSource.setInitialSize(2);
dataSource.setMaxActive(5);
// set connection timeout to 5s.
// The default is 30s, which is too long.
// Because when querying information_schema db, BE will call thrift rpc(default timeout is 30s)
// to FE to get schema info, and may create connection here, if we set it too long and the url is invalid,
// it may cause the thrift rpc timeout.
config.setConnectionTimeout(5000);
dataSource = new HikariDataSource(config);
dataSource.setMaxWait(5000);
} catch (MalformedURLException e) {
throw new JdbcClientException("MalformedURLException to load class about " + driverUrl, e);
} finally {

View File

@ -90,12 +90,13 @@ under the License.
<artifactId>ojdbc6</artifactId>
<version>11.2.0.4</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.2.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.junit.jupiter/junit-jupiter-engine -->
<dependency>
<groupId>org.junit.jupiter</groupId>

View File

@ -187,7 +187,7 @@ under the License.
<commons-io.version>2.6</commons-io.version>
<json-simple.version>1.1.1</json-simple.version>
<junit.version>5.8.2</junit.version>
<hikaricp.version>3.4.5</hikaricp.version>
<druid.version>1.2.5</druid.version>
<thrift.version>0.13.0</thrift.version>
<log4j2.version>2.18.0</log4j2.version>
<metrics-core.version>4.0.2</metrics-core.version>
@ -988,9 +988,9 @@ under the License.
<version>${dlf-metastore-client-hive.version}</version>
</dependency>
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>${hikaricp.version}</version>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
<dependency>