[feature](multi-catalog) support oceanbase jdbc catalog and jdbc external table (#18943)
* [feature](multi-catalog) support oceanbase jdbc catalog and jdbc external table
This commit is contained in:
@ -69,6 +69,7 @@ public class JdbcResource extends Resource {
|
||||
public static final String JDBC_CLICKHOUSE = "jdbc:clickhouse";
|
||||
public static final String JDBC_SAP_HANA = "jdbc:sap";
|
||||
public static final String JDBC_TRINO = "jdbc:trino";
|
||||
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
|
||||
|
||||
public static final String MYSQL = "MYSQL";
|
||||
public static final String POSTGRESQL = "POSTGRESQL";
|
||||
@ -77,6 +78,8 @@ public class JdbcResource extends Resource {
|
||||
public static final String CLICKHOUSE = "CLICKHOUSE";
|
||||
public static final String SAP_HANA = "SAP_HANA";
|
||||
public static final String TRINO = "TRINO";
|
||||
public static final String OCEANBASE = "OCEANBASE";
|
||||
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
|
||||
|
||||
public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
|
||||
public static final String JDBC_URL = "jdbc_url";
|
||||
@ -87,6 +90,7 @@ public class JdbcResource extends Resource {
|
||||
public static final String TYPE = "type";
|
||||
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
|
||||
public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names";
|
||||
public static final String OCEANBASE_MODE = "oceanbase_mode";
|
||||
public static final String CHECK_SUM = "checksum";
|
||||
private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
|
||||
JDBC_URL,
|
||||
@ -97,12 +101,14 @@ public class JdbcResource extends Resource {
|
||||
TYPE,
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
SPECIFIED_DATABASE_LIST
|
||||
SPECIFIED_DATABASE_LIST,
|
||||
OCEANBASE_MODE
|
||||
).build();
|
||||
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
SPECIFIED_DATABASE_LIST
|
||||
SPECIFIED_DATABASE_LIST,
|
||||
OCEANBASE_MODE
|
||||
).build();
|
||||
|
||||
// The default value of optional properties
|
||||
@ -113,6 +119,7 @@ public class JdbcResource extends Resource {
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(SPECIFIED_DATABASE_LIST, "");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(OCEANBASE_MODE, "");
|
||||
}
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
@ -139,7 +146,7 @@ public class JdbcResource extends Resource {
|
||||
for (String propertyKey : ALL_PROPERTIES) {
|
||||
replaceIfEffectiveValue(this.configs, propertyKey, properties.get(propertyKey));
|
||||
}
|
||||
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
|
||||
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(OCEANBASE_MODE)));
|
||||
super.modifyProperties(properties);
|
||||
}
|
||||
|
||||
@ -172,7 +179,7 @@ public class JdbcResource extends Resource {
|
||||
throw new DdlException("JdbcResource Missing " + property + " in properties");
|
||||
}
|
||||
}
|
||||
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL)));
|
||||
this.configs.put(JDBC_URL, handleJdbcUrl(getProperty(JDBC_URL), getProperty(OCEANBASE_MODE)));
|
||||
configs.put(CHECK_SUM, computeObjectChecksum(getProperty(DRIVER_URL)));
|
||||
}
|
||||
|
||||
@ -255,7 +262,7 @@ public class JdbcResource extends Resource {
|
||||
}
|
||||
}
|
||||
|
||||
public static String parseDbType(String url) throws DdlException {
|
||||
public static String parseDbType(String url, String oceanbaseMode) throws DdlException {
|
||||
if (url.startsWith(JDBC_MYSQL) || url.startsWith(JDBC_MARIADB)) {
|
||||
return MYSQL;
|
||||
} else if (url.startsWith(JDBC_POSTGRESQL)) {
|
||||
@ -270,15 +277,27 @@ public class JdbcResource extends Resource {
|
||||
return SAP_HANA;
|
||||
} else if (url.startsWith(JDBC_TRINO)) {
|
||||
return TRINO;
|
||||
} else if (url.startsWith(JDBC_OCEANBASE)) {
|
||||
if (oceanbaseMode == null || oceanbaseMode.isEmpty()) {
|
||||
throw new DdlException("OceanBase mode must be specified for OceanBase databases"
|
||||
+ "(either 'mysql' or 'oracle')");
|
||||
}
|
||||
if (oceanbaseMode.equalsIgnoreCase("mysql")) {
|
||||
return OCEANBASE;
|
||||
} else if (oceanbaseMode.equalsIgnoreCase("oracle")) {
|
||||
return OCEANBASE_ORACLE;
|
||||
} else {
|
||||
throw new DdlException("Invalid OceanBase mode: " + oceanbaseMode + ". Must be 'mysql' or 'oracle'");
|
||||
}
|
||||
}
|
||||
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
|
||||
}
|
||||
|
||||
public static String handleJdbcUrl(String jdbcUrl) throws DdlException {
|
||||
public static String handleJdbcUrl(String jdbcUrl, String oceanbaseMode) throws DdlException {
|
||||
// delete all space in jdbcUrl
|
||||
String newJdbcUrl = jdbcUrl.replaceAll(" ", "");
|
||||
String dbType = parseDbType(newJdbcUrl);
|
||||
if (dbType.equals(MYSQL)) {
|
||||
String dbType = parseDbType(newJdbcUrl, oceanbaseMode);
|
||||
if (dbType.equals(MYSQL) || dbType.equals(OCEANBASE)) {
|
||||
// `yearIsDateType` is a parameter of JDBC, and the default is true.
|
||||
// We force the use of `yearIsDateType=false`
|
||||
newJdbcUrl = checkAndSetJdbcBoolParam(newJdbcUrl, "yearIsDateType", "true", "false");
|
||||
|
||||
@ -39,6 +39,7 @@ import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
@ -76,6 +77,8 @@ public class JdbcTable extends Table {
|
||||
tempMap.put("clickhouse", TOdbcTableType.CLICKHOUSE);
|
||||
tempMap.put("sap_hana", TOdbcTableType.SAP_HANA);
|
||||
tempMap.put("trino", TOdbcTableType.TRINO);
|
||||
tempMap.put("oceanbase", TOdbcTableType.OCEANBASE);
|
||||
tempMap.put("oceanbase_oracle", TOdbcTableType.OCEANBASE_ORACLE);
|
||||
TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
|
||||
}
|
||||
|
||||
@ -267,7 +270,11 @@ public class JdbcTable extends Table {
|
||||
if (Strings.isNullOrEmpty(jdbcTypeName)) {
|
||||
throw new DdlException("property " + TABLE_TYPE + " must be set");
|
||||
}
|
||||
if (!TABLE_TYPE_MAP.containsKey(jdbcTypeName.toLowerCase())) {
|
||||
|
||||
Map<String, TOdbcTableType> tableTypeMapWithoutOceanbaseOracle = new HashMap<>(TABLE_TYPE_MAP);
|
||||
tableTypeMapWithoutOceanbaseOracle.remove("oceanbase_oracle");
|
||||
|
||||
if (!tableTypeMapWithoutOceanbaseOracle.containsKey(jdbcTypeName.toLowerCase())) {
|
||||
throw new DdlException("Unknown jdbc table type: " + jdbcTypeName);
|
||||
}
|
||||
|
||||
@ -286,5 +293,19 @@ public class JdbcTable extends Table {
|
||||
driverClass = jdbcResource.getProperty(DRIVER_CLASS);
|
||||
driverUrl = jdbcResource.getProperty(DRIVER_URL);
|
||||
checkSum = jdbcResource.getProperty(CHECK_SUM);
|
||||
|
||||
// get oceanbase_mode
|
||||
String oceanbaseMode = jdbcResource.getProperty("oceanbase_mode");
|
||||
|
||||
// by oceanbase_mode set jdbcTypeName
|
||||
if ("oceanbase".equalsIgnoreCase(jdbcTypeName)) {
|
||||
if ("mysql".equalsIgnoreCase(oceanbaseMode)) {
|
||||
jdbcTypeName = "oceanbase";
|
||||
} else if ("oracle".equalsIgnoreCase(oceanbaseMode)) {
|
||||
jdbcTypeName = "oceanbase_oracle";
|
||||
} else {
|
||||
throw new DdlException("Unknown oceanbase_mode: " + oceanbaseMode);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -112,9 +112,15 @@ public class OdbcTable extends Table {
|
||||
return list.stream().map(s -> "\"" + s + "\"").collect(Collectors.joining("."));
|
||||
}
|
||||
|
||||
private static String oceanbaseOracleProperName(String name) {
|
||||
List<String> list = Arrays.asList(name.split("\\."));
|
||||
return list.stream().map(s -> "\"" + s + "\"").collect(Collectors.joining("."));
|
||||
}
|
||||
|
||||
public static String databaseProperName(TOdbcTableType tableType, String name) {
|
||||
switch (tableType) {
|
||||
case MYSQL:
|
||||
case OCEANBASE:
|
||||
return mysqlProperName(name);
|
||||
case SQLSERVER:
|
||||
return mssqlProperName(name);
|
||||
@ -128,6 +134,8 @@ public class OdbcTable extends Table {
|
||||
return saphanaProperName(name);
|
||||
case TRINO:
|
||||
return trinoProperName(name);
|
||||
case OCEANBASE_ORACLE:
|
||||
return oceanbaseOracleProperName(name);
|
||||
default:
|
||||
return name;
|
||||
}
|
||||
|
||||
@ -80,8 +80,9 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
properties.put(StringUtils.removeStart(kv.getKey(), JdbcResource.JDBC_PROPERTIES_PREFIX), kv.getValue());
|
||||
}
|
||||
String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, "");
|
||||
String oceanbaseMode = properties.getOrDefault(JdbcResource.OCEANBASE_MODE, "");
|
||||
if (!Strings.isNullOrEmpty(jdbcUrl)) {
|
||||
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl);
|
||||
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl, oceanbaseMode);
|
||||
properties.put(JdbcResource.JDBC_URL, jdbcUrl);
|
||||
}
|
||||
|
||||
@ -132,10 +133,14 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
return catalogProperty.getOrDefault(JdbcResource.SPECIFIED_DATABASE_LIST, "");
|
||||
}
|
||||
|
||||
public String getOceanBaseMode() {
|
||||
return catalogProperty.getOrDefault(JdbcResource.OCEANBASE_MODE, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), getDriverClass(),
|
||||
getOnlySpecifiedDatabase(), getLowerCaseTableNames(), getSpecifiedDatabaseMap());
|
||||
getOnlySpecifiedDatabase(), getLowerCaseTableNames(), getSpecifiedDatabaseMap(), getOceanBaseMode());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -66,16 +66,20 @@ public class JdbcClient {
|
||||
// only used when isLowerCaseTableNames = true.
|
||||
private Map<String, String> lowerTableToRealTable = Maps.newHashMap();
|
||||
|
||||
private String oceanbaseMode = "";
|
||||
|
||||
public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass,
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames, Map specifiedDatabaseMap) {
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames, Map specifiedDatabaseMap,
|
||||
String oceanbaseMode) {
|
||||
this.jdbcUser = user;
|
||||
this.isOnlySpecifiedDatabase = Boolean.valueOf(onlySpecifiedDatabase).booleanValue();
|
||||
this.isLowerCaseTableNames = Boolean.valueOf(isLowerCaseTableNames).booleanValue();
|
||||
if (specifiedDatabaseMap != null) {
|
||||
this.specifiedDatabaseMap = specifiedDatabaseMap;
|
||||
}
|
||||
this.oceanbaseMode = oceanbaseMode;
|
||||
try {
|
||||
this.dbType = JdbcResource.parseDbType(jdbcUrl);
|
||||
this.dbType = JdbcResource.parseDbType(jdbcUrl, oceanbaseMode);
|
||||
} catch (DdlException e) {
|
||||
throw new JdbcClientException("Failed to parse db type from jdbcUrl: " + jdbcUrl, e);
|
||||
}
|
||||
@ -185,6 +189,7 @@ public class JdbcClient {
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
case JdbcResource.OCEANBASE:
|
||||
rs = stmt.executeQuery("SHOW DATABASES");
|
||||
break;
|
||||
case JdbcResource.POSTGRESQL:
|
||||
@ -192,6 +197,7 @@ public class JdbcClient {
|
||||
+ "'" + jdbcUser + "', nspname, 'USAGE');");
|
||||
break;
|
||||
case JdbcResource.ORACLE:
|
||||
case JdbcResource.OCEANBASE_ORACLE:
|
||||
rs = stmt.executeQuery("SELECT DISTINCT OWNER FROM all_tables");
|
||||
break;
|
||||
case JdbcResource.SQLSERVER:
|
||||
@ -233,6 +239,7 @@ public class JdbcClient {
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
case JdbcResource.OCEANBASE:
|
||||
databaseNames.add(conn.getCatalog());
|
||||
break;
|
||||
case JdbcResource.POSTGRESQL:
|
||||
@ -240,6 +247,7 @@ public class JdbcClient {
|
||||
case JdbcResource.SQLSERVER:
|
||||
case JdbcResource.SAP_HANA:
|
||||
case JdbcResource.TRINO:
|
||||
case JdbcResource.OCEANBASE_ORACLE:
|
||||
databaseNames.add(conn.getSchema());
|
||||
break;
|
||||
default:
|
||||
@ -266,6 +274,7 @@ public class JdbcClient {
|
||||
String catalogName = conn.getCatalog();
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.OCEANBASE:
|
||||
rs = databaseMetaData.getTables(dbName, null, null, types);
|
||||
break;
|
||||
case JdbcResource.POSTGRESQL:
|
||||
@ -273,6 +282,7 @@ public class JdbcClient {
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
case JdbcResource.SQLSERVER:
|
||||
case JdbcResource.SAP_HANA:
|
||||
case JdbcResource.OCEANBASE_ORACLE:
|
||||
rs = databaseMetaData.getTables(null, dbName, null, types);
|
||||
break;
|
||||
case JdbcResource.TRINO:
|
||||
@ -306,6 +316,7 @@ public class JdbcClient {
|
||||
String catalogName = conn.getCatalog();
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.OCEANBASE:
|
||||
rs = databaseMetaData.getTables(dbName, null, tableName, types);
|
||||
break;
|
||||
case JdbcResource.POSTGRESQL:
|
||||
@ -313,6 +324,7 @@ public class JdbcClient {
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
case JdbcResource.SQLSERVER:
|
||||
case JdbcResource.SAP_HANA:
|
||||
case JdbcResource.OCEANBASE_ORACLE:
|
||||
rs = databaseMetaData.getTables(null, dbName, null, types);
|
||||
break;
|
||||
case JdbcResource.TRINO:
|
||||
@ -381,6 +393,7 @@ public class JdbcClient {
|
||||
// Can contain single-character wildcards ("_"), or multi-character wildcards ("%")
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.OCEANBASE:
|
||||
rs = databaseMetaData.getColumns(dbName, null, tableName, null);
|
||||
break;
|
||||
case JdbcResource.POSTGRESQL:
|
||||
@ -388,6 +401,7 @@ public class JdbcClient {
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
case JdbcResource.SQLSERVER:
|
||||
case JdbcResource.SAP_HANA:
|
||||
case JdbcResource.OCEANBASE_ORACLE:
|
||||
rs = databaseMetaData.getColumns(null, dbName, tableName, null);
|
||||
break;
|
||||
case JdbcResource.TRINO:
|
||||
@ -427,12 +441,14 @@ public class JdbcClient {
|
||||
public Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
|
||||
switch (dbType) {
|
||||
case JdbcResource.MYSQL:
|
||||
case JdbcResource.OCEANBASE:
|
||||
return mysqlTypeToDoris(fieldSchema);
|
||||
case JdbcResource.POSTGRESQL:
|
||||
return postgresqlTypeToDoris(fieldSchema);
|
||||
case JdbcResource.CLICKHOUSE:
|
||||
return clickhouseTypeToDoris(fieldSchema);
|
||||
case JdbcResource.ORACLE:
|
||||
case JdbcResource.OCEANBASE_ORACLE:
|
||||
return oracleTypeToDoris(fieldSchema);
|
||||
case JdbcResource.SQLSERVER:
|
||||
return sqlserverTypeToDoris(fieldSchema);
|
||||
@ -876,7 +892,6 @@ public class JdbcClient {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private Type createDecimalOrStringType(int precision, int scale) {
|
||||
if (precision <= ScalarType.MAX_DECIMAL128_PRECISION) {
|
||||
return ScalarType.createDecimalV3Type(precision, scale);
|
||||
|
||||
Reference in New Issue
Block a user