[Enhencement](Jdbc catalog) Add two optional properties for jdbc catalog (#17245)
1. The first property is `only_specified_database`:
In the past, `Jdbc Catalog` will synchronize all database from source database.
Now we add a parameter called `only_specified_database` to jdbc catalog to allow only the specified database to be synchronized, eg:
```sql
create resource if not exists ${resource_name} properties(
"type"="jdbc",
"user"="root",
"password"="123456",
"jdbc_url" = "jdbc:mysql://172.18.0.1:${mysql_port}/doris_test?useSSL=false",
"driver_url" = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/mysql-connector-java-8.0.25.jar",
"driver_class" = "com.mysql.cj.jdbc.Driver",
"only_specified_database" = "true"
);
```
if `only_specified_database` is `true`, jdbc catalog will only synchronize the database which is specified in `jdbc_url`.
2. The second property is `lower_case_table_names`:
This property will synchronize jdbc external data source table names in lower case.
```sql
create resource if not exists ${resource_name} properties(
"type"="jdbc",
"user"="doris_test",
"password"="123456",
"jdbc_url" = "jdbc:oracle:thin:@172.18.0.1:${oracle_port}:${SID}",
"driver_url" = "https://doris-community-test-1308700295.cos.ap-hongkong.myqcloud.com/jdbc_driver/ojdbc8.jar",
"driver_class" = "oracle.jdbc.driver.OracleDriver",
"lower_case_table_names" = "true"
);
```
This commit is contained in:
@ -39,6 +39,7 @@ import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
|
||||
@ -80,8 +81,23 @@ public class JdbcResource extends Resource {
|
||||
public static final String DRIVER_CLASS = "driver_class";
|
||||
public static final String DRIVER_URL = "driver_url";
|
||||
public static final String TYPE = "type";
|
||||
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
|
||||
public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names";
|
||||
public static final String CHECK_SUM = "checksum";
|
||||
|
||||
private static final List<String> OPTIONAL_PROPERTIES = Lists.newArrayList(
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES
|
||||
);
|
||||
|
||||
// The default value of optional properties
|
||||
private static final Map<String, String> OPTIONAL_PROPERTIES_DEFAULT_VALUE = Maps.newHashMap();
|
||||
|
||||
static {
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
|
||||
}
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
private static final int HTTP_TIMEOUT_MS = 10000;
|
||||
@SerializedName(value = "configs")
|
||||
@ -121,6 +137,8 @@ public class JdbcResource extends Resource {
|
||||
replaceIfEffectiveValue(this.configs, USER, properties.get(USER));
|
||||
replaceIfEffectiveValue(this.configs, PASSWORD, properties.get(PASSWORD));
|
||||
replaceIfEffectiveValue(this.configs, TYPE, properties.get(TYPE));
|
||||
replaceIfEffectiveValue(this.configs, ONLY_SPECIFIED_DATABASE, properties.get(ONLY_SPECIFIED_DATABASE));
|
||||
replaceIfEffectiveValue(this.configs, LOWER_CASE_TABLE_NAMES, properties.get(LOWER_CASE_TABLE_NAMES));
|
||||
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
|
||||
super.modifyProperties(properties);
|
||||
}
|
||||
@ -135,6 +153,8 @@ public class JdbcResource extends Resource {
|
||||
copiedProperties.remove(USER);
|
||||
copiedProperties.remove(PASSWORD);
|
||||
copiedProperties.remove(TYPE);
|
||||
copiedProperties.remove(ONLY_SPECIFIED_DATABASE);
|
||||
copiedProperties.remove(LOWER_CASE_TABLE_NAMES);
|
||||
if (!copiedProperties.isEmpty()) {
|
||||
throw new AnalysisException("Unknown JDBC catalog resource properties: " + copiedProperties);
|
||||
}
|
||||
@ -144,22 +164,46 @@ public class JdbcResource extends Resource {
|
||||
protected void setProperties(Map<String, String> properties) throws DdlException {
|
||||
Preconditions.checkState(properties != null);
|
||||
for (String key : properties.keySet()) {
|
||||
if (!DRIVER_URL.equals(key) && !JDBC_URL.equals(key) && !USER.equals(key) && !PASSWORD.equals(key)
|
||||
&& !TYPE.equals(key) && !DRIVER_CLASS.equals(key)) {
|
||||
throw new DdlException("JDBC resource Property of " + key + " is unknown");
|
||||
switch (key) {
|
||||
case DRIVER_URL:
|
||||
case JDBC_URL:
|
||||
case USER:
|
||||
case PASSWORD:
|
||||
case TYPE:
|
||||
case DRIVER_CLASS:
|
||||
case ONLY_SPECIFIED_DATABASE: // optional argument
|
||||
case LOWER_CASE_TABLE_NAMES: // optional argument
|
||||
break;
|
||||
default:
|
||||
throw new DdlException("JDBC resource Property of " + key + " is unknown");
|
||||
}
|
||||
}
|
||||
configs = properties;
|
||||
handleOptionalArguments();
|
||||
checkProperties(DRIVER_URL);
|
||||
checkProperties(DRIVER_CLASS);
|
||||
checkProperties(JDBC_URL);
|
||||
checkProperties(USER);
|
||||
checkProperties(PASSWORD);
|
||||
checkProperties(TYPE);
|
||||
checkProperties(ONLY_SPECIFIED_DATABASE);
|
||||
checkProperties(LOWER_CASE_TABLE_NAMES);
|
||||
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
|
||||
configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL)));
|
||||
}
|
||||
|
||||
/**
|
||||
* This function used to handle optional arguments
|
||||
* eg: only_specified_database、lower_case_table_names
|
||||
*/
|
||||
private void handleOptionalArguments() {
|
||||
for (String s : OPTIONAL_PROPERTIES) {
|
||||
if (!configs.containsKey(s)) {
|
||||
configs.put(s, OPTIONAL_PROPERTIES_DEFAULT_VALUE.get(s));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getCopiedProperties() {
|
||||
return Maps.newHashMap(configs);
|
||||
|
||||
@ -55,7 +55,6 @@ public class JdbcExternalDatabase extends ExternalDatabase<JdbcExternalTable> im
|
||||
super(extCatalog, id, name);
|
||||
}
|
||||
|
||||
// TODO(ftw): drew out the public multiple parts
|
||||
@Override
|
||||
protected void init() {
|
||||
InitDatabaseLog initDatabaseLog = new InitDatabaseLog();
|
||||
|
||||
@ -120,9 +120,18 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
return catalogProperty.getOrDefault(JdbcResource.CHECK_SUM, "");
|
||||
}
|
||||
|
||||
public String getOnlySpecifiedDatabase() {
|
||||
return catalogProperty.getOrDefault(JdbcResource.ONLY_SPECIFIED_DATABASE, "false");
|
||||
}
|
||||
|
||||
public String getLowerCaseTableNames() {
|
||||
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), getDriverClass());
|
||||
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), getDriverClass(),
|
||||
getOnlySpecifiedDatabase(), getLowerCaseTableNames());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
|
||||
import org.apache.doris.common.DdlException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import lombok.Data;
|
||||
@ -42,6 +43,7 @@ import java.sql.ResultSet;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@Getter
|
||||
public class JdbcClient {
|
||||
@ -56,8 +58,18 @@ public class JdbcClient {
|
||||
|
||||
private HikariDataSource dataSource = null;
|
||||
|
||||
public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass) {
|
||||
private boolean isOnlySpecifiedDatabase = false;
|
||||
|
||||
private boolean isLowerCaseTableNames = false;
|
||||
|
||||
// only used when isLowerCaseTableNames = true.
|
||||
private Map<String, String> lowerTableToRealTable = Maps.newHashMap();
|
||||
|
||||
public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass,
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames) {
|
||||
this.jdbcUser = user;
|
||||
this.isOnlySpecifiedDatabase = Boolean.valueOf(onlySpecifiedDatabase).booleanValue();
|
||||
this.isLowerCaseTableNames = Boolean.valueOf(isLowerCaseTableNames).booleanValue();
|
||||
try {
|
||||
this.dbType = JdbcResource.parseDbType(jdbcUrl);
|
||||
} catch (DdlException e) {
|
||||
@ -153,6 +165,9 @@ public class JdbcClient {
|
||||
Connection conn = getConnection();
|
||||
Statement stmt = null;
|
||||
ResultSet rs = null;
|
||||
if (isOnlySpecifiedDatabase) {
|
||||
return getSpecifiedDatabase(conn);
|
||||
}
|
||||
List<String> databaseNames = Lists.newArrayList();
|
||||
try {
|
||||
stmt = conn.createStatement();
|
||||
@ -186,6 +201,30 @@ public class JdbcClient {
|
||||
return databaseNames;
|
||||
}
|
||||
|
||||
public List<String> getSpecifiedDatabase(Connection conn) {
|
||||
List<String> databaseNames = Lists.newArrayList();
|
||||
try {
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
databaseNames.add(conn.getCatalog());
|
||||
break;
|
||||
case JdbcResource.POSTGRESQL:
|
||||
case JdbcResource.ORACLE:
|
||||
case JdbcResource.SQLSERVER:
|
||||
databaseNames.add(conn.getSchema());
|
||||
break;
|
||||
default:
|
||||
throw new JdbcClientException("Not supported jdbc type");
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JdbcClientException("failed to get specified database name from jdbc", e);
|
||||
} finally {
|
||||
close(conn);
|
||||
}
|
||||
return databaseNames;
|
||||
}
|
||||
|
||||
/**
|
||||
* get all tables of one database
|
||||
*/
|
||||
@ -210,7 +249,12 @@ public class JdbcClient {
|
||||
throw new JdbcClientException("Unknown database type");
|
||||
}
|
||||
while (rs.next()) {
|
||||
tablesName.add(rs.getString("TABLE_NAME"));
|
||||
String tableName = rs.getString("TABLE_NAME");
|
||||
if (isLowerCaseTableNames) {
|
||||
lowerTableToRealTable.put(tableName.toLowerCase(), tableName);
|
||||
tableName = tableName.toLowerCase();
|
||||
}
|
||||
tablesName.add(tableName);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JdbcClientException("failed to get all tables for db %s", e, dbName);
|
||||
@ -280,6 +324,11 @@ public class JdbcClient {
|
||||
Connection conn = getConnection();
|
||||
ResultSet rs = null;
|
||||
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
|
||||
// if isLowerCaseTableNames == true, tableName is lower case
|
||||
// but databaseMetaData.getColumns() is case sensitive
|
||||
if (isLowerCaseTableNames) {
|
||||
tableName = lowerTableToRealTable.get(tableName);
|
||||
}
|
||||
try {
|
||||
DatabaseMetaData databaseMetaData = conn.getMetaData();
|
||||
// getColumns(String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern)
|
||||
|
||||
Reference in New Issue
Block a user