[enhancement](jdbc catalog) Add lowercase column name mapping to Jdbc data source & optimize database and table mapping (#27124)

This PR adds the processing of lowercase Column names in Oracle Jdbc Catalog. In the previous behavior, we changed all Oracle columns to uppercase queries by default, but could not handle the lowercase case. This PR can solve this situation and improve All Jdbc Catalog works
This commit is contained in:
zy-kkk
2023-11-17 23:51:47 +08:00
committed by GitHub
parent 5d548935e0
commit b477839bce
13 changed files with 196 additions and 96 deletions

View File

@ -27,6 +27,8 @@ import org.apache.doris.thrift.TOdbcTableType;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import lombok.Setter;
@ -47,9 +49,12 @@ import java.util.stream.Collectors;
public class JdbcTable extends Table {
private static final Logger LOG = LogManager.getLogger(JdbcTable.class);
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String TABLE = "table";
private static final String REAL_DATABASE = "real_database";
private static final String REAL_TABLE = "real_table";
private static final String REAL_COLUMNS = "real_columns";
private static final String RESOURCE = "resource";
private static final String TABLE_TYPE = "table_type";
private static final String URL = "jdbc_url";
@ -65,6 +70,7 @@ public class JdbcTable extends Table {
// real name only for jdbc catalog
private String realDatabaseName;
private String realTableName;
private Map<String, String> realColumnNames;
private String jdbcTypeName;
@ -110,7 +116,7 @@ public class JdbcTable extends Table {
sb.append(getProperRealFullTableName(TABLE_TYPE_MAP.get(getTableTypeName())));
sb.append("(");
List<String> transformedInsertCols = insertCols.stream()
.map(col -> databaseProperName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
.map(col -> getProperRealColumnName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
.collect(Collectors.toList());
sb.append(String.join(",", transformedInsertCols));
sb.append(")");
@ -200,6 +206,7 @@ public class JdbcTable extends Table {
serializeMap.put(CHECK_SUM, checkSum);
serializeMap.put(REAL_DATABASE, realDatabaseName);
serializeMap.put(REAL_TABLE, realTableName);
serializeMap.put(REAL_COLUMNS, objectMapper.writeValueAsString(realColumnNames));
int size = (int) serializeMap.values().stream().filter(v -> {
return v != null;
@ -236,6 +243,9 @@ public class JdbcTable extends Table {
checkSum = serializeMap.get(CHECK_SUM);
realDatabaseName = serializeMap.get(REAL_DATABASE);
realTableName = serializeMap.get(REAL_TABLE);
String realColumnNamesJson = serializeMap.get(REAL_COLUMNS);
realColumnNames = objectMapper.readValue(realColumnNamesJson, new TypeReference<Map<String, String>>() {
});
}
public String getResourceName() {
@ -263,6 +273,14 @@ public class JdbcTable extends Table {
}
}
public String getProperRealColumnName(TOdbcTableType tableType, String columnName) {
if (realColumnNames == null || realColumnNames.isEmpty() || !realColumnNames.containsKey(columnName)) {
return databaseProperName(tableType, columnName);
} else {
return properNameWithRealName(tableType, realColumnNames.get(columnName));
}
}
public String getTableTypeName() {
return jdbcTypeName;
}
@ -358,14 +376,13 @@ public class JdbcTable extends Table {
* @param wrapEnd The character(s) to be added at the end of each name component.
* @param toUpperCase If true, convert the name to upper case.
* @param toLowerCase If true, convert the name to lower case.
* <p>
* Note: If both toUpperCase and toLowerCase are true, the name will ultimately be converted to lower case.
* <p>
* The name is expected to be in the format of 'schemaName.tableName'. If there is no '.',
* the function will treat the entire string as one name component.
* If there is a '.', the function will treat the string before the first '.' as the schema name
* and the string after the '.' as the table name.
*
* <p>
* Note: If both toUpperCase and toLowerCase are true, the name will ultimately be converted to lower case.
* <p>
* The name is expected to be in the format of 'schemaName.tableName'. If there is no '.',
* the function will treat the entire string as one name component.
* If there is a '.', the function will treat the string before the first '.' as the schema name
* and the string after the '.' as the table name.
* @return The formatted name.
*/
public static String formatName(String name, String wrapStart, String wrapEnd, boolean toUpperCase,
@ -386,18 +403,18 @@ public class JdbcTable extends Table {
/**
* Formats a database name according to the database type.
*
* <p>
* Rules:
* - MYSQL, OCEANBASE: Wrap with backticks (`), case unchanged. Example: mySchema.myTable -> `mySchema.myTable`
* - SQLSERVER: Wrap with square brackets ([]), case unchanged. Example: mySchema.myTable -> [mySchema].[myTable]
* - POSTGRESQL, CLICKHOUSE, TRINO, OCEANBASE_ORACLE, SAP_HANA: Wrap with double quotes ("), case unchanged.
* Example: mySchema.myTable -> "mySchema"."myTable"
* Example: mySchema.myTable -> "mySchema"."myTable"
* - ORACLE: Wrap with double quotes ("), convert to upper case. Example: mySchema.myTable -> "MYSCHEMA"."MYTABLE"
* For other types, the name is returned as is.
*
* @param tableType The database type.
* @param name The name to be formatted, expected in 'schemaName.tableName' format. If no '.', treats entire string
* as one name component. If '.', treats string before first '.' as schema name and after as table name.
* as one name component. If '.', treats string before first '.' as schema name and after as table name.
* @return The formatted name.
*/
public static String databaseProperName(TOdbcTableType tableType, String name) {

View File

@ -91,6 +91,8 @@ public class JdbcExternalTable extends ExternalTable {
jdbcTable.setRealDatabaseName(((JdbcExternalCatalog) catalog).getJdbcClient().getRealDatabaseName(this.dbName));
jdbcTable.setRealTableName(
((JdbcExternalCatalog) catalog).getJdbcClient().getRealTableName(this.dbName, this.name));
jdbcTable.setRealColumnNames(((JdbcExternalCatalog) catalog).getJdbcClient().getRealColumnNames(this.dbName,
this.name));
jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName());
jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl());
jdbcTable.setJdbcUser(jdbcCatalog.getJdbcUser());

View File

@ -67,10 +67,15 @@ public abstract class JdbcClient {
// only used when isLowerCaseTableNames = true.
protected final ConcurrentHashMap<String, String> lowerDBToRealDB = new ConcurrentHashMap<>();
// only used when isLowerCaseTableNames = true.
protected final ConcurrentHashMap<String, String> lowerTableToRealTable = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> lowerTableToRealTable
= new ConcurrentHashMap<>();
// only used when isLowerCaseTableNames = true.
protected final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, String>>>
lowerColumnToRealColumn = new ConcurrentHashMap<>();
private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
private final AtomicBoolean tableNamesLoaded = new AtomicBoolean(false);
private final AtomicBoolean columnNamesLoaded = new AtomicBoolean(false);
public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) {
String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
@ -178,7 +183,7 @@ public abstract class JdbcClient {
if (closeable != null) {
try {
closeable.close();
} catch (Exception e) {
} catch (Exception e) {
throw new JdbcClientException("Can not close : ", e);
}
}
@ -186,8 +191,10 @@ public abstract class JdbcClient {
}
// This part used to process meta-information of database, table and column.
/**
* get all database name through JDBC
*
* @return list of database names
*/
public List<String> getDatabaseNameList() {
@ -208,6 +215,8 @@ public abstract class JdbcClient {
if (isLowerCaseTableNames) {
lowerDBToRealDB.put(databaseName.toLowerCase(), databaseName);
databaseName = databaseName.toLowerCase();
} else {
lowerDBToRealDB.put(databaseName, databaseName);
}
tempDatabaseNames.add(databaseName);
}
@ -237,20 +246,20 @@ public abstract class JdbcClient {
* get all tables of one database
*/
public List<String> getTablesNameList(String dbName) {
String currentDbName = dbName;
List<String> tablesName = Lists.newArrayList();
String[] tableTypes = getTableTypes();
if (isLowerCaseTableNames) {
currentDbName = getRealDatabaseName(dbName);
}
String finalDbName = currentDbName;
String finalDbName = getRealDatabaseName(dbName);
processTable(finalDbName, null, tableTypes, (rs) -> {
try {
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
if (isLowerCaseTableNames) {
lowerTableToRealTable.put(tableName.toLowerCase(), tableName);
lowerTableToRealTable.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerTableToRealTable.get(finalDbName).put(tableName.toLowerCase(), tableName);
tableName = tableName.toLowerCase();
} else {
lowerTableToRealTable.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerTableToRealTable.get(finalDbName).put(tableName, tableName);
}
tablesName.add(tableName);
}
@ -262,16 +271,10 @@ public abstract class JdbcClient {
}
public boolean isTableExist(String dbName, String tableName) {
String currentDbName = dbName;
String currentTableName = tableName;
final boolean[] isExist = {false};
if (isLowerCaseTableNames) {
currentDbName = getRealDatabaseName(dbName);
currentTableName = getRealTableName(dbName, tableName);
}
String[] tableTypes = getTableTypes();
String finalTableName = currentTableName;
String finalDbName = currentDbName;
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
processTable(finalDbName, finalTableName, tableTypes, (rs) -> {
try {
if (rs.next()) {
@ -292,23 +295,25 @@ public abstract class JdbcClient {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
// if isLowerCaseTableNames == true, tableName is lower case
// but databaseMetaData.getColumns() is case sensitive
String currentDbName = dbName;
String currentTableName = tableName;
if (isLowerCaseTableNames) {
currentDbName = getRealDatabaseName(dbName);
currentTableName = getRealTableName(dbName, tableName);
}
String finalDbName = currentDbName;
String finalTableName = currentTableName;
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
while (rs.next()) {
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
field.setColumnName(rs.getString("COLUMN_NAME"));
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setDataType(rs.getInt("DATA_TYPE"));
field.setDataTypeName(rs.getString("TYPE_NAME"));
/*
@ -352,11 +357,9 @@ public abstract class JdbcClient {
}
public String getRealDatabaseName(String dbname) {
if (!isLowerCaseTableNames) {
return dbname;
}
if (lowerDBToRealDB.isEmpty() || !lowerDBToRealDB.containsKey(dbname)) {
if (lowerDBToRealDB == null
|| lowerDBToRealDB.isEmpty()
|| !lowerDBToRealDB.containsKey(dbname)) {
loadDatabaseNamesIfNeeded();
}
@ -364,15 +367,34 @@ public abstract class JdbcClient {
}
public String getRealTableName(String dbName, String tableName) {
if (!isLowerCaseTableNames) {
return tableName;
}
if (lowerTableToRealTable.isEmpty() || !lowerTableToRealTable.containsKey(tableName)) {
String realDbName = getRealDatabaseName(dbName);
if (lowerTableToRealTable == null
|| lowerTableToRealTable.isEmpty()
|| !lowerTableToRealTable.containsKey(realDbName)
|| lowerTableToRealTable.get(realDbName) == null
|| lowerTableToRealTable.get(realDbName).isEmpty()
|| !lowerTableToRealTable.get(realDbName).containsKey(tableName)
|| lowerTableToRealTable.get(realDbName).get(tableName) == null) {
loadTableNamesIfNeeded(dbName);
}
return lowerTableToRealTable.get(tableName);
return lowerTableToRealTable.get(realDbName).get(tableName);
}
public Map<String, String> getRealColumnNames(String dbName, String tableName) {
String realDbName = getRealDatabaseName(dbName);
String realTableName = getRealTableName(dbName, tableName);
if (lowerColumnToRealColumn == null
|| lowerColumnToRealColumn.isEmpty()
|| !lowerColumnToRealColumn.containsKey(realDbName)
|| lowerColumnToRealColumn.get(realDbName) == null
|| lowerColumnToRealColumn.get(realDbName).isEmpty()
|| !lowerColumnToRealColumn.get(realDbName).containsKey(realTableName)
|| lowerColumnToRealColumn.get(realDbName).get(realTableName) == null
|| lowerColumnToRealColumn.get(realDbName).get(realTableName).isEmpty()) {
loadColumnNamesIfNeeded(dbName, tableName);
}
return lowerColumnToRealColumn.get(realDbName).get(realTableName);
}
private void loadDatabaseNamesIfNeeded() {
@ -387,6 +409,12 @@ public abstract class JdbcClient {
}
}
private void loadColumnNamesIfNeeded(String dbName, String tableName) {
if (columnNamesLoaded.compareAndSet(false, true)) {
getJdbcColumnsInfo(dbName, tableName);
}
}
// protected methods,for subclass to override
protected String getCatalogName(Connection conn) throws SQLException {
return null;
@ -411,7 +439,7 @@ public abstract class JdbcClient {
}
protected void processTable(String dbName, String tableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Consumer<ResultSet> resultSetConsumer) {
Connection conn = getConnection();
ResultSet rs = null;
try {
@ -435,7 +463,7 @@ public abstract class JdbcClient {
}
protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName,
String tableName) throws SQLException {
String tableName) throws SQLException {
return databaseMetaData.getColumns(catalogName, schemaName, tableName, null);
}

View File

@ -35,6 +35,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
public class JdbcMySQLClient extends JdbcClient {
@ -117,21 +118,27 @@ public class JdbcMySQLClient extends JdbcClient {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = com.google.common.collect.Lists.newArrayList();
// if isLowerCaseTableNames == true, tableName is lower case
// but databaseMetaData.getColumns() is case sensitive
if (isLowerCaseTableNames) {
dbName = lowerDBToRealDB.get(dbName);
tableName = lowerTableToRealTable.get(tableName);
}
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getColumns(databaseMetaData, catalogName, dbName, tableName);
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
List<String> primaryKeys = getPrimaryKeys(databaseMetaData, catalogName, dbName, tableName);
Map<String, String> mapFieldtoType = null;
while (rs.next()) {
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
field.setColumnName(rs.getString("COLUMN_NAME"));
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setDataType(rs.getInt("DATA_TYPE"));
// in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column in doris will be "UNKNOWN"

View File

@ -28,6 +28,7 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class JdbcOracleClient extends JdbcClient {
@ -61,6 +62,8 @@ public class JdbcOracleClient extends JdbcClient {
if (isLowerCaseTableNames) {
lowerDBToRealDB.put(databaseName.toLowerCase(), databaseName);
databaseName = databaseName.toLowerCase();
} else {
lowerDBToRealDB.put(databaseName, databaseName);
}
tempDatabaseNames.add(databaseName);
}
@ -91,14 +94,8 @@ public class JdbcOracleClient extends JdbcClient {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String currentDbName = dbName;
String currentTableName = tableName;
if (isLowerCaseTableNames) {
currentDbName = getRealDatabaseName(dbName);
currentTableName = getRealTableName(dbName, tableName);
}
String finalDbName = currentDbName;
String finalTableName = currentTableName;
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
@ -119,8 +116,18 @@ public class JdbcOracleClient extends JdbcClient {
if (isModify && isTableModified(rs.getString("TABLE_NAME"), finalTableName)) {
continue;
}
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
field.setColumnName(rs.getString("COLUMN_NAME"));
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setDataType(rs.getInt("DATA_TYPE"));
field.setDataTypeName(rs.getString("TYPE_NAME"));
/*

View File

@ -138,7 +138,7 @@ public class JdbcScanNode extends ExternalScanNode {
List<Expr> pushDownConjuncts = collectConjunctsToPushDown(conjunctsList, errors);
for (Expr individualConjunct : pushDownConjuncts) {
String filter = conjunctExprToString(jdbcType, individualConjunct);
String filter = conjunctExprToString(jdbcType, individualConjunct, tbl);
filters.add(filter);
conjuncts.remove(individualConjunct);
}
@ -169,9 +169,9 @@ public class JdbcScanNode extends ExternalScanNode {
continue;
}
Column col = slot.getColumn();
columns.add(JdbcTable.databaseProperName(jdbcType, col.getName()));
columns.add(tbl.getProperRealColumnName(jdbcType, col.getName()));
}
if (0 == columns.size()) {
if (columns.isEmpty()) {
columns.add("*");
}
}
@ -324,12 +324,12 @@ public class JdbcScanNode extends ExternalScanNode {
return !fnExprList.isEmpty();
}
public static String conjunctExprToString(TOdbcTableType tableType, Expr expr) {
public static String conjunctExprToString(TOdbcTableType tableType, Expr expr, JdbcTable tbl) {
if (expr instanceof CompoundPredicate) {
StringBuilder result = new StringBuilder();
CompoundPredicate compoundPredicate = (CompoundPredicate) expr;
for (Expr child : compoundPredicate.getChildren()) {
result.append(conjunctExprToString(tableType, child));
result.append(conjunctExprToString(tableType, child, tbl));
result.append(" ").append(compoundPredicate.getOp().toString()).append(" ");
}
// Remove the last operator
@ -357,7 +357,11 @@ public class JdbcScanNode extends ExternalScanNode {
ArrayList<Expr> children = expr.getChildren();
String filter;
if (children.get(0) instanceof SlotRef) {
filter = JdbcTable.databaseProperName(tableType, children.get(0).toMySql());
if (tbl != null) {
filter = tbl.getProperRealColumnName(tableType, children.get(0).toMySql());
} else {
filter = JdbcTable.databaseProperName(tableType, children.get(0).toMySql());
}
} else {
filter = children.get(0).toMySql();
}

View File

@ -184,7 +184,7 @@ public class OdbcScanNode extends ExternalScanNode {
ArrayList<Expr> odbcConjuncts = Expr.cloneList(conjuncts, sMap);
for (Expr p : odbcConjuncts) {
if (shouldPushDownConjunct(odbcType, p)) {
String filter = JdbcScanNode.conjunctExprToString(odbcType, p);
String filter = JdbcScanNode.conjunctExprToString(odbcType, p, null);
filters.add(filter);
conjuncts.remove(p);
}