[Refactor](catalog) Refactor Jdbc Catalog external name case mapping rules (#28414)

This commit is contained in:
zy-kkk
2024-02-19 11:19:44 +08:00
committed by yiguolei
parent 8db2824c44
commit b3ac2128dd
27 changed files with 949 additions and 336 deletions

View File

@ -307,7 +307,7 @@ public class SlotRef extends Expr {
if (tableType.equals(TableType.JDBC_EXTERNAL_TABLE) || tableType.equals(TableType.JDBC) || tableType
.equals(TableType.ODBC)) {
if (inputTable instanceof JdbcTable) {
return ((JdbcTable) inputTable).getProperRealColumnName(
return ((JdbcTable) inputTable).getProperRemoteColumnName(
((JdbcTable) inputTable).getJdbcTableType(), col);
} else if (inputTable instanceof OdbcTable) {
return JdbcTable.databaseProperName(((OdbcTable) inputTable).getOdbcTableType(), col);

View File

@ -95,7 +95,6 @@ public class JdbcResource extends Resource {
public static final String DRIVER_URL = "driver_url";
public static final String TYPE = "type";
public static final String ONLY_SPECIFIED_DATABASE = "only_specified_database";
public static final String LOWER_CASE_TABLE_NAMES = "lower_case_table_names";
public static final String CONNECTION_POOL_MIN_SIZE = "connection_pool_min_size";
public static final String CONNECTION_POOL_MAX_SIZE = "connection_pool_max_size";
public static final String CONNECTION_POOL_MAX_WAIT_TIME = "connection_pool_max_wait_time";
@ -112,7 +111,8 @@ public class JdbcResource extends Resource {
TYPE,
CREATE_TIME,
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES,
LOWER_CASE_META_NAMES,
META_NAMES_MAPPING,
INCLUDE_DATABASE_LIST,
EXCLUDE_DATABASE_LIST,
CONNECTION_POOL_MIN_SIZE,
@ -123,7 +123,8 @@ public class JdbcResource extends Resource {
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
LOWER_CASE_TABLE_NAMES,
LOWER_CASE_META_NAMES,
META_NAMES_MAPPING,
INCLUDE_DATABASE_LIST,
EXCLUDE_DATABASE_LIST,
CONNECTION_POOL_MIN_SIZE,
@ -139,7 +140,8 @@ public class JdbcResource extends Resource {
static {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_META_NAMES, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(META_NAMES_MAPPING, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MIN_SIZE, "1");

View File

@ -53,9 +53,9 @@ public class JdbcTable extends Table {
private static final String CATALOG_ID = "catalog_id";
private static final String TABLE = "table";
private static final String REAL_DATABASE = "real_database";
private static final String REAL_TABLE = "real_table";
private static final String REAL_COLUMNS = "real_columns";
private static final String REMOTE_DATABASE = "remote_database";
private static final String REMOTE_TABLE = "remote_table";
private static final String REMOTE_COLUMNS = "remote_columns";
private static final String RESOURCE = "resource";
private static final String TABLE_TYPE = "table_type";
private static final String URL = "jdbc_url";
@ -69,9 +69,9 @@ public class JdbcTable extends Table {
private String externalTableName;
// real name only for jdbc catalog
private String realDatabaseName;
private String realTableName;
private Map<String, String> realColumnNames;
private String remoteDatabaseName;
private String remoteTableName;
private Map<String, String> remoteColumnNames;
private String jdbcTypeName;
@ -122,10 +122,10 @@ public class JdbcTable extends Table {
public String getInsertSql(List<String> insertCols) {
StringBuilder sb = new StringBuilder("INSERT INTO ");
sb.append(getProperRealFullTableName(TABLE_TYPE_MAP.get(getTableTypeName())));
sb.append(getProperRemoteFullTableName(TABLE_TYPE_MAP.get(getTableTypeName())));
sb.append("(");
List<String> transformedInsertCols = insertCols.stream()
.map(col -> getProperRealColumnName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
.map(col -> getProperRemoteColumnName(TABLE_TYPE_MAP.get(getTableTypeName()), col))
.collect(Collectors.toList());
sb.append(String.join(",", transformedInsertCols));
sb.append(")");
@ -249,9 +249,9 @@ public class JdbcTable extends Table {
serializeMap.put(DRIVER_CLASS, driverClass);
serializeMap.put(DRIVER_URL, driverUrl);
serializeMap.put(CHECK_SUM, checkSum);
serializeMap.put(REAL_DATABASE, realDatabaseName);
serializeMap.put(REAL_TABLE, realTableName);
serializeMap.put(REAL_COLUMNS, objectMapper.writeValueAsString(realColumnNames));
serializeMap.put(REMOTE_DATABASE, remoteDatabaseName);
serializeMap.put(REMOTE_TABLE, remoteTableName);
serializeMap.put(REMOTE_COLUMNS, objectMapper.writeValueAsString(remoteColumnNames));
int size = (int) serializeMap.values().stream().filter(v -> {
return v != null;
@ -287,14 +287,14 @@ public class JdbcTable extends Table {
driverClass = serializeMap.get(DRIVER_CLASS);
driverUrl = serializeMap.get(DRIVER_URL);
checkSum = serializeMap.get(CHECK_SUM);
realDatabaseName = serializeMap.get(REAL_DATABASE);
realTableName = serializeMap.get(REAL_TABLE);
String realColumnNamesJson = serializeMap.get(REAL_COLUMNS);
remoteDatabaseName = serializeMap.get(REMOTE_DATABASE);
remoteTableName = serializeMap.get(REMOTE_TABLE);
String realColumnNamesJson = serializeMap.get(REMOTE_COLUMNS);
if (realColumnNamesJson != null) {
realColumnNames = objectMapper.readValue(realColumnNamesJson, new TypeReference<Map<String, String>>() {
remoteColumnNames = objectMapper.readValue(realColumnNamesJson, new TypeReference<Map<String, String>>() {
});
} else {
realColumnNames = Maps.newHashMap();
remoteColumnNames = Maps.newHashMap();
}
}
@ -306,28 +306,28 @@ public class JdbcTable extends Table {
return externalTableName;
}
public String getRealDatabaseName() {
return realDatabaseName;
public String getRemoteDatabaseName() {
return remoteDatabaseName;
}
public String getRealTableName() {
return realTableName;
public String getRemoteTableName() {
return remoteTableName;
}
public String getProperRealFullTableName(TOdbcTableType tableType) {
if (realDatabaseName == null || realTableName == null) {
public String getProperRemoteFullTableName(TOdbcTableType tableType) {
if (remoteDatabaseName == null || remoteTableName == null) {
return databaseProperName(tableType, externalTableName);
} else {
return properNameWithRealName(tableType, realDatabaseName) + "." + properNameWithRealName(tableType,
realTableName);
return properNameWithRemoteName(tableType, remoteDatabaseName) + "." + properNameWithRemoteName(tableType,
remoteTableName);
}
}
public String getProperRealColumnName(TOdbcTableType tableType, String columnName) {
if (realColumnNames == null || realColumnNames.isEmpty() || !realColumnNames.containsKey(columnName)) {
public String getProperRemoteColumnName(TOdbcTableType tableType, String columnName) {
if (remoteColumnNames == null || remoteColumnNames.isEmpty() || !remoteColumnNames.containsKey(columnName)) {
return databaseProperName(tableType, columnName);
} else {
return properNameWithRealName(tableType, realColumnNames.get(columnName));
return properNameWithRemoteName(tableType, remoteColumnNames.get(columnName));
}
}
@ -496,13 +496,13 @@ public class JdbcTable extends Table {
}
}
public static String properNameWithRealName(TOdbcTableType tableType, String name) {
public static String properNameWithRemoteName(TOdbcTableType tableType, String remoteName) {
switch (tableType) {
case MYSQL:
case OCEANBASE:
return formatNameWithRealName(name, "`", "`");
return formatNameWithRemoteName(remoteName, "`", "`");
case SQLSERVER:
return formatNameWithRealName(name, "[", "]");
return formatNameWithRemoteName(remoteName, "[", "]");
case POSTGRESQL:
case CLICKHOUSE:
case TRINO:
@ -510,13 +510,13 @@ public class JdbcTable extends Table {
case OCEANBASE_ORACLE:
case ORACLE:
case SAP_HANA:
return formatNameWithRealName(name, "\"", "\"");
return formatNameWithRemoteName(remoteName, "\"", "\"");
default:
return name;
return remoteName;
}
}
public static String formatNameWithRealName(String name, String wrapStart, String wrapEnd) {
return wrapStart + name + wrapEnd;
public static String formatNameWithRemoteName(String remoteName, String wrapStart, String wrapEnd) {
return wrapStart + remoteName + wrapEnd;
}
}

View File

@ -48,6 +48,8 @@ public abstract class Resource implements Writable, GsonPostProcessable {
public static final String REFERENCE_SPLIT = "@";
public static final String INCLUDE_DATABASE_LIST = "include_database_list";
public static final String EXCLUDE_DATABASE_LIST = "exclude_database_list";
public static final String LOWER_CASE_META_NAMES = "lower_case_meta_names";
public static final String META_NAMES_MAPPING = "meta_names_mapping";
public enum ResourceType {
UNKNOWN,

View File

@ -84,10 +84,11 @@ public class JdbcExternalTable extends ExternalTable {
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcTable.setCatalogId(jdbcCatalog.getId());
jdbcTable.setExternalTableName(fullDbName);
jdbcTable.setRealDatabaseName(((JdbcExternalCatalog) catalog).getJdbcClient().getRealDatabaseName(this.dbName));
jdbcTable.setRealTableName(
((JdbcExternalCatalog) catalog).getJdbcClient().getRealTableName(this.dbName, this.name));
jdbcTable.setRealColumnNames(((JdbcExternalCatalog) catalog).getJdbcClient().getRealColumnNames(this.dbName,
jdbcTable.setRemoteDatabaseName(
((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(
((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteTableName(this.dbName, this.name));
jdbcTable.setRemoteColumnNames(((JdbcExternalCatalog) catalog).getJdbcClient().getRemoteColumnNames(this.dbName,
this.name));
jdbcTable.setJdbcTypeName(jdbcCatalog.getDatabaseTypeName());
jdbcTable.setJdbcUrl(jdbcCatalog.getJdbcUrl());

View File

@ -119,7 +119,7 @@ public class CatalogFactory {
catalog = new EsExternalCatalog(catalogId, name, resource, props, comment);
break;
case "jdbc":
catalog = new JdbcExternalCatalog(catalogId, name, resource, props, comment);
catalog = new JdbcExternalCatalog(catalogId, name, resource, props, comment, isReplay);
break;
case "iceberg":
catalog = IcebergExternalCatalogFactory.createCatalog(catalogId, name, resource, props, comment);

View File

@ -26,7 +26,6 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.qe.GlobalVariable;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
@ -34,12 +33,16 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
import java.util.Map;
@Getter
public class JdbcExternalCatalog extends ExternalCatalog {
private static final Logger LOG = LogManager.getLogger(JdbcExternalCatalog.class);
private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
JdbcResource.JDBC_URL,
JdbcResource.DRIVER_URL,
@ -51,10 +54,10 @@ public class JdbcExternalCatalog extends ExternalCatalog {
private transient JdbcClient jdbcClient;
public JdbcExternalCatalog(long catalogId, String name, String resource, Map<String, String> props,
String comment)
String comment, boolean isReplay)
throws DdlException {
super(catalogId, name, InitCatalogLog.Type.JDBC, comment);
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props, isReplay));
}
@Override
@ -71,7 +74,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
JdbcResource.validateProperties(propertiesWithoutCheckSum);
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase());
JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_TABLE_NAMES, getLowerCaseTableNames());
JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_META_NAMES, getLowerCaseMetaNames());
JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(),
getExcludeDatabaseMap());
JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(),
@ -94,11 +97,24 @@ public class JdbcExternalCatalog extends ExternalCatalog {
}
}
private Map<String, String> processCompatibleProperties(Map<String, String> props) throws DdlException {
protected Map<String, String> processCompatibleProperties(Map<String, String> props, boolean isReplay)
throws DdlException {
Map<String, String> properties = Maps.newHashMap();
for (Map.Entry<String, String> kv : props.entrySet()) {
properties.put(StringUtils.removeStart(kv.getKey(), JdbcResource.JDBC_PROPERTIES_PREFIX), kv.getValue());
}
// Modify lower_case_table_names to lower_case_meta_names if it exists
if (properties.containsKey("lower_case_table_names") && isReplay) {
String lowerCaseTableNamesValue = properties.get("lower_case_table_names");
properties.put("lower_case_meta_names", lowerCaseTableNamesValue);
properties.remove("lower_case_table_names");
LOG.info("Modify lower_case_table_names to lower_case_meta_names, value: {}", lowerCaseTableNamesValue);
} else if (properties.containsKey("lower_case_table_names") && !isReplay) {
throw new DdlException("Jdbc catalog property lower_case_table_names is not supported,"
+ " please use lower_case_meta_names instead");
}
String jdbcUrl = properties.getOrDefault(JdbcResource.JDBC_URL, "");
if (!Strings.isNullOrEmpty(jdbcUrl)) {
jdbcUrl = JdbcResource.handleJdbcUrl(jdbcUrl);
@ -145,15 +161,14 @@ public class JdbcExternalCatalog extends ExternalCatalog {
JdbcResource.ONLY_SPECIFIED_DATABASE));
}
public String getLowerCaseTableNames() {
// Forced to true if Config.lower_case_table_names has a value of 1 or 2
if (GlobalVariable.lowerCaseTableNames == 1 || GlobalVariable.lowerCaseTableNames == 2) {
return "true";
}
public String getLowerCaseMetaNames() {
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_META_NAMES, JdbcResource.getDefaultPropertyValue(
JdbcResource.LOWER_CASE_META_NAMES));
}
// Otherwise, it defaults to false
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, JdbcResource.getDefaultPropertyValue(
JdbcResource.LOWER_CASE_TABLE_NAMES));
public String getMetaNamesMapping() {
return catalogProperty.getOrDefault(JdbcResource.META_NAMES_MAPPING, JdbcResource.getDefaultPropertyValue(
JdbcResource.META_NAMES_MAPPING));
}
public int getConnectionPoolMinSize() {
@ -191,7 +206,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIsLowerCaseTableNames(getLowerCaseTableNames())
.setIsLowerCaseMetaNames(getLowerCaseMetaNames())
.setMetaNamesMapping(getMetaNamesMapping())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap())
.setConnectionPoolMinSize(getConnectionPoolMinSize())

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.jdbc;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.mapping.IdentifierMapping;
public class JdbcIdentifierMapping extends IdentifierMapping {
private final JdbcClient jdbcClient;
public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping, JdbcClient jdbcClient) {
super(isLowerCaseMetaNames, metaNamesMapping);
this.jdbcClient = jdbcClient;
}
@Override
protected void loadDatabaseNames() {
jdbcClient.getDatabaseNameList();
}
@Override
protected void loadTableNames(String localDbName) {
jdbcClient.getTablesNameList(localDbName);
}
@Override
protected void loadColumnNames(String localDbName, String localTableName) {
jdbcClient.getJdbcColumnsInfo(localDbName, localTableName);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping;
import com.alibaba.druid.pool.DruidDataSource;
import com.google.common.collect.Lists;
@ -43,8 +44,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
@Getter
@ -53,29 +52,17 @@ public abstract class JdbcClient {
private static final int HTTP_TIMEOUT_MS = 10000;
protected static final int JDBC_DATETIME_SCALE = 6;
private String catalog;
private String catalogName;
protected String dbType;
protected String jdbcUser;
protected URLClassLoader classLoader = null;
protected DruidDataSource dataSource = null;
protected boolean isOnlySpecifiedDatabase;
protected boolean isLowerCaseTableNames;
protected boolean isLowerCaseMetaNames;
protected String metaNamesMapping;
protected Map<String, Boolean> includeDatabaseMap;
protected Map<String, Boolean> excludeDatabaseMap;
// only used when isLowerCaseTableNames = true.
protected final ConcurrentHashMap<String, String> lowerDBToRealDB = new ConcurrentHashMap<>();
// only used when isLowerCaseTableNames = true.
protected final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> lowerTableToRealTable
= new ConcurrentHashMap<>();
// only used when isLowerCaseTableNames = true.
protected final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, String>>>
lowerColumnToRealColumn = new ConcurrentHashMap<>();
private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicBoolean>> columnNamesLoadedMap
= new ConcurrentHashMap<>();
protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching;
public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) {
String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
@ -103,10 +90,11 @@ public abstract class JdbcClient {
}
protected JdbcClient(JdbcClientConfig jdbcClientConfig) {
this.catalog = jdbcClientConfig.getCatalog();
this.catalogName = jdbcClientConfig.getCatalog();
this.jdbcUser = jdbcClientConfig.getUser();
this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
this.isLowerCaseTableNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseTableNames());
this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames());
this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping();
this.includeDatabaseMap =
Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap());
this.excludeDatabaseMap =
@ -114,6 +102,7 @@ public abstract class JdbcClient {
String jdbcUrl = jdbcClientConfig.getJdbcUrl();
this.dbType = parseDbType(jdbcUrl);
initializeDataSource(jdbcClientConfig);
this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this);
}
// Initialize DruidDataSource
@ -177,7 +166,7 @@ public abstract class JdbcClient {
conn = dataSource.getConnection();
} catch (Exception e) {
String errorMessage = String.format("Can not connect to jdbc due to error: %s, Catalog name: %s", e,
this.getCatalog());
this.getCatalogName());
throw new JdbcClientException(errorMessage, e);
}
return conn;
@ -195,6 +184,25 @@ public abstract class JdbcClient {
}
}
/**
* Execute stmt direct via jdbc
*
* @param origStmt, the raw stmt string
*/
public void executeStmt(String origStmt) {
Connection conn = getConnection();
Statement stmt = null;
try {
stmt = conn.createStatement();
int effectedRows = stmt.executeUpdate(origStmt);
LOG.debug("finished to execute dml stmt: {}, effected rows: {}", origStmt, effectedRows);
} catch (SQLException e) {
throw new JdbcClientException("Failed to execute stmt. error: " + e.getMessage(), e);
} finally {
close(stmt, conn);
}
}
// This part used to process meta-information of database, table and column.
/**
@ -209,85 +217,54 @@ public abstract class JdbcClient {
if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) {
return getSpecifiedDatabase(conn);
}
List<String> databaseNames = Lists.newArrayList();
List<String> remoteDatabaseNames = Lists.newArrayList();
try {
stmt = conn.createStatement();
String sql = getDatabaseQuery();
rs = stmt.executeQuery(sql);
List<String> tempDatabaseNames = Lists.newArrayList();
while (rs.next()) {
String databaseName = rs.getString(1);
if (isLowerCaseTableNames) {
lowerDBToRealDB.put(databaseName.toLowerCase(), databaseName);
databaseName = databaseName.toLowerCase();
} else {
lowerDBToRealDB.put(databaseName, databaseName);
}
tempDatabaseNames.add(databaseName);
}
if (isOnlySpecifiedDatabase) {
for (String db : tempDatabaseNames) {
// Exclude database map take effect with higher priority over include database map
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(db)) {
continue;
}
if (!includeDatabaseMap.isEmpty() && !includeDatabaseMap.containsKey(db)) {
continue;
}
databaseNames.add(db);
}
} else {
databaseNames = tempDatabaseNames;
remoteDatabaseNames.add(rs.getString(1));
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get database name list from jdbc", e);
} finally {
close(rs, stmt, conn);
}
return databaseNames;
return filterDatabaseNames(remoteDatabaseNames);
}
/**
* get all tables of one database
*/
public List<String> getTablesNameList(String dbName) {
List<String> tablesName = Lists.newArrayList();
public List<String> getTablesNameList(String localDbName) {
List<String> remoteTablesNames = Lists.newArrayList();
String[] tableTypes = getTableTypes();
String finalDbName = getRealDatabaseName(dbName);
processTable(finalDbName, null, tableTypes, (rs) -> {
String remoteDbName = getRemoteDatabaseName(localDbName);
processTable(remoteDbName, null, tableTypes, (rs) -> {
try {
while (rs.next()) {
String tableName = rs.getString("TABLE_NAME");
if (isLowerCaseTableNames) {
lowerTableToRealTable.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerTableToRealTable.get(finalDbName).put(tableName.toLowerCase(), tableName);
tableName = tableName.toLowerCase();
} else {
lowerTableToRealTable.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerTableToRealTable.get(finalDbName).put(tableName, tableName);
}
tablesName.add(tableName);
remoteTablesNames.add(rs.getString("TABLE_NAME"));
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get all tables for db %s", e, finalDbName);
throw new JdbcClientException("failed to get all tables for remote database %s", e, remoteDbName);
}
});
return tablesName;
return filterTableNames(remoteDbName, remoteTablesNames);
}
public boolean isTableExist(String dbName, String tableName) {
public boolean isTableExist(String localDbName, String localTableName) {
final boolean[] isExist = {false};
String[] tableTypes = getTableTypes();
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
processTable(finalDbName, finalTableName, tableTypes, (rs) -> {
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> {
try {
if (rs.next()) {
isExist[0] = true;
}
} catch (SQLException e) {
throw new JdbcClientException("failed to judge if table exist for table %s in db %s",
e, finalTableName, finalDbName);
e, remoteTableName, remoteDbName);
}
});
return isExist[0];
@ -296,29 +273,19 @@ public abstract class JdbcClient {
/**
* get all columns of one table
*/
public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);
while (rs.next()) {
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setColumnName(rs.getString("COLUMN_NAME"));
field.setDataType(rs.getInt("DATA_TYPE"));
field.setDataTypeName(rs.getString("TYPE_NAME"));
/*
@ -340,16 +307,16 @@ public abstract class JdbcClient {
tableSchema.add(field);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get table name list from jdbc for table %s:%s", e, finalTableName,
Util.getRootCauseMessage(e));
throw new JdbcClientException("failed to get jdbc columns info for table %.%s: %s",
e, localDbName, localTableName, Util.getRootCauseMessage(e));
} finally {
close(rs, conn);
}
return tableSchema;
}
public List<Column> getColumnsFromJdbc(String dbName, String tableName) {
List<JdbcFieldSchema> jdbcTableSchema = getJdbcColumnsInfo(dbName, tableName);
public List<Column> getColumnsFromJdbc(String localDbName, String localTableName) {
List<JdbcFieldSchema> jdbcTableSchema = getJdbcColumnsInfo(localDbName, localTableName);
List<Column> dorisTableSchema = Lists.newArrayListWithCapacity(jdbcTableSchema.size());
for (JdbcFieldSchema field : jdbcTableSchema) {
dorisTableSchema.add(new Column(field.getColumnName(),
@ -357,71 +324,21 @@ public abstract class JdbcClient {
field.isAllowNull(), field.getRemarks(),
true, -1));
}
return dorisTableSchema;
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema);
}
public String getRealDatabaseName(String dbname) {
if (lowerDBToRealDB == null
|| lowerDBToRealDB.isEmpty()
|| !lowerDBToRealDB.containsKey(dbname)) {
loadDatabaseNamesIfNeeded();
}
return lowerDBToRealDB.get(dbname);
public String getRemoteDatabaseName(String localDbname) {
return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname);
}
public String getRealTableName(String dbName, String tableName) {
String realDbName = getRealDatabaseName(dbName);
if (lowerTableToRealTable == null
|| lowerTableToRealTable.isEmpty()
|| !lowerTableToRealTable.containsKey(realDbName)
|| lowerTableToRealTable.get(realDbName) == null
|| lowerTableToRealTable.get(realDbName).isEmpty()
|| !lowerTableToRealTable.get(realDbName).containsKey(tableName)
|| lowerTableToRealTable.get(realDbName).get(tableName) == null) {
loadTableNamesIfNeeded(dbName);
}
return lowerTableToRealTable.get(realDbName).get(tableName);
public String getRemoteTableName(String localDbName, String localTableName) {
return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName);
}
public Map<String, String> getRealColumnNames(String dbName, String tableName) {
String realDbName = getRealDatabaseName(dbName);
String realTableName = getRealTableName(dbName, tableName);
if (lowerColumnToRealColumn == null
|| lowerColumnToRealColumn.isEmpty()
|| !lowerColumnToRealColumn.containsKey(realDbName)
|| lowerColumnToRealColumn.get(realDbName) == null
|| lowerColumnToRealColumn.get(realDbName).isEmpty()
|| !lowerColumnToRealColumn.get(realDbName).containsKey(realTableName)
|| lowerColumnToRealColumn.get(realDbName).get(realTableName) == null
|| lowerColumnToRealColumn.get(realDbName).get(realTableName).isEmpty()) {
loadColumnNamesIfNeeded(dbName, tableName);
}
return lowerColumnToRealColumn.get(realDbName).get(realTableName);
}
private void loadDatabaseNamesIfNeeded() {
if (dbNamesLoaded.compareAndSet(false, true)) {
getDatabaseNameList();
}
}
private void loadTableNamesIfNeeded(String dbName) {
AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(dbName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
getTablesNameList(dbName);
}
}
private void loadColumnNamesIfNeeded(String dbName, String tableName) {
ConcurrentHashMap<String, AtomicBoolean> tableMap = columnNamesLoadedMap.computeIfAbsent(dbName,
k -> new ConcurrentHashMap<>());
AtomicBoolean isLoaded = tableMap.computeIfAbsent(tableName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
getJdbcColumnsInfo(dbName, tableName);
}
public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName);
}
// protected methods,for subclass to override
@ -447,14 +364,14 @@ public abstract class JdbcClient {
return new String[] {"TABLE", "VIEW"};
}
protected void processTable(String dbName, String tableName, String[] tableTypes,
protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = getConnection();
ResultSet rs = null;
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = databaseMetaData.getTables(catalogName, dbName, tableName, tableTypes);
rs = databaseMetaData.getTables(catalogName, remoteDbName, remoteTableName, tableTypes);
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
@ -463,36 +380,41 @@ public abstract class JdbcClient {
}
}
protected String modifyTableNameIfNecessary(String tableName) {
return tableName;
protected String modifyTableNameIfNecessary(String remoteTableName) {
return remoteTableName;
}
protected boolean isTableModified(String modifiedTableName, String actualTableName) {
return false;
}
protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName,
String tableName) throws SQLException {
return databaseMetaData.getColumns(catalogName, schemaName, tableName, null);
protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName,
String remoteTableName) throws SQLException {
return databaseMetaData.getColumns(catalogName, remoteDbName, remoteTableName, null);
}
/**
* Execute stmt direct via jdbc
*
* @param origStmt, the raw stmt string
*/
public void executeStmt(String origStmt) {
Connection conn = getConnection();
Statement stmt = null;
try {
stmt = conn.createStatement();
int effectedRows = stmt.executeUpdate(origStmt);
LOG.debug("finished to execute dml stmt: {}, effected rows: {}", origStmt, effectedRows);
} catch (SQLException e) {
throw new JdbcClientException("Failed to execute stmt. error: " + e.getMessage(), e);
} finally {
close(stmt, conn);
protected List<String> filterDatabaseNames(List<String> remoteDbNames) {
List<String> filteredDatabaseNames = Lists.newArrayList();
for (String databaseName : remoteDbNames) {
if (isOnlySpecifiedDatabase) {
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(databaseName)) {
continue;
}
if (!includeDatabaseMap.isEmpty() && !includeDatabaseMap.containsKey(databaseName)) {
continue;
}
}
filteredDatabaseNames.add(databaseName);
}
return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames);
}
protected List<String> filterTableNames(String remoteDbName, List<String> localTableNames) {
return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, localTableNames);
}
protected List<Column> filterColumnName(String remoteDbName, String remoteTableName, List<Column> remoteColumns) {
return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns);
}
@Data

View File

@ -32,7 +32,8 @@ public class JdbcClientConfig implements Cloneable {
private String driverUrl;
private String driverClass;
private String onlySpecifiedDatabase;
private String isLowerCaseTableNames;
private String isLowerCaseMetaNames;
private String metaNamesMapping;
private int connectionPoolMinSize;
private int connectionPoolMaxSize;
private int connectionPoolMaxWaitTime;
@ -45,7 +46,8 @@ public class JdbcClientConfig implements Cloneable {
public JdbcClientConfig() {
this.onlySpecifiedDatabase = JdbcResource.getDefaultPropertyValue(JdbcResource.ONLY_SPECIFIED_DATABASE);
this.isLowerCaseTableNames = JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_TABLE_NAMES);
this.isLowerCaseMetaNames = JdbcResource.getDefaultPropertyValue(JdbcResource.LOWER_CASE_META_NAMES);
this.metaNamesMapping = JdbcResource.getDefaultPropertyValue(JdbcResource.META_NAMES_MAPPING);
this.connectionPoolMinSize = Integer.parseInt(
JdbcResource.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_MIN_SIZE));
this.connectionPoolMaxSize = Integer.parseInt(
@ -143,12 +145,21 @@ public class JdbcClientConfig implements Cloneable {
return this;
}
public String getIsLowerCaseTableNames() {
return isLowerCaseTableNames;
public String getIsLowerCaseMetaNames() {
return isLowerCaseMetaNames;
}
public JdbcClientConfig setIsLowerCaseTableNames(String isLowerCaseTableNames) {
this.isLowerCaseTableNames = isLowerCaseTableNames;
public JdbcClientConfig setIsLowerCaseMetaNames(String isLowerCaseTableNames) {
this.isLowerCaseMetaNames = isLowerCaseTableNames;
return this;
}
public String getMetaNamesMapping() {
return metaNamesMapping;
}
public JdbcClientConfig setMetaNamesMapping(String metaNamesMapping) {
this.metaNamesMapping = metaNamesMapping;
return this;
}

View File

@ -33,7 +33,6 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
public class JdbcMySQLClient extends JdbcClient {
@ -81,14 +80,14 @@ public class JdbcMySQLClient extends JdbcClient {
}
@Override
protected void processTable(String dbName, String tableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = null;
ResultSet rs = null;
try {
conn = super.getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
rs = databaseMetaData.getTables(dbName, null, tableName, tableTypes);
rs = databaseMetaData.getTables(remoteDbName, null, remoteTableName, tableTypes);
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
@ -103,39 +102,29 @@ public class JdbcMySQLClient extends JdbcClient {
}
@Override
protected ResultSet getColumns(DatabaseMetaData databaseMetaData, String catalogName, String schemaName,
String tableName) throws SQLException {
return databaseMetaData.getColumns(schemaName, null, tableName, null);
protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName,
String remoteTableName) throws SQLException {
return databaseMetaData.getColumns(remoteDbName, null, remoteTableName, null);
}
/**
* get all columns of one table
*/
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
List<JdbcFieldSchema> tableSchema = com.google.common.collect.Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);
Map<String, String> mapFieldtoType = null;
while (rs.next()) {
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setColumnName(rs.getString("COLUMN_NAME"));
field.setDataType(rs.getInt("DATA_TYPE"));
// in mysql-jdbc-connector-8.0.*, TYPE_NAME of the HLL column in doris will be "UNKNOWN"
@ -144,7 +133,7 @@ public class JdbcMySQLClient extends JdbcClient {
// in mysql-jdbc-connector-5.1.*, TYPE_NAME of BITMAP column in doris will be "BITMAP"
field.setDataTypeName(rs.getString("TYPE_NAME"));
if (isDoris) {
mapFieldtoType = getColumnsDataTypeUseQuery(dbName, tableName);
mapFieldtoType = getColumnsDataTypeUseQuery(localDbName, localTableName);
field.setDataTypeName(mapFieldtoType.get(rs.getString("COLUMN_NAME")));
}
field.setColumnSize(rs.getInt("COLUMN_SIZE"));
@ -163,7 +152,7 @@ public class JdbcMySQLClient extends JdbcClient {
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get jdbc columns info for table %.%s: %s",
e, dbName, tableName, Util.getRootCauseMessage(e));
e, localDbName, localTableName, Util.getRootCauseMessage(e));
} finally {
close(rs, conn);
}

View File

@ -28,7 +28,6 @@ import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
public class JdbcOracleClient extends JdbcClient {
@ -56,78 +55,46 @@ public class JdbcOracleClient extends JdbcClient {
List<String> databaseNames = Lists.newArrayList();
try {
rs = conn.getMetaData().getSchemas(conn.getCatalog(), null);
List<String> tempDatabaseNames = Lists.newArrayList();
while (rs.next()) {
String databaseName = rs.getString("TABLE_SCHEM");
if (isLowerCaseTableNames) {
lowerDBToRealDB.put(databaseName.toLowerCase(), databaseName);
databaseName = databaseName.toLowerCase();
} else {
lowerDBToRealDB.put(databaseName, databaseName);
}
tempDatabaseNames.add(databaseName);
}
if (isOnlySpecifiedDatabase) {
for (String db : tempDatabaseNames) {
// Exclude database map take effect with higher priority over include database map
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(db)) {
continue;
}
if (!includeDatabaseMap.isEmpty() && !includeDatabaseMap.containsKey(db)) {
continue;
}
databaseNames.add(db);
}
} else {
databaseNames = tempDatabaseNames;
databaseNames.add(rs.getString("TABLE_SCHEM"));
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get database name list from jdbc", e);
} finally {
close(rs, conn);
}
return databaseNames;
return filterDatabaseNames(databaseNames);
}
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String dbName, String tableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String finalDbName = getRealDatabaseName(dbName);
String finalTableName = getRealTableName(dbName, tableName);
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
String modifiedTableName;
boolean isModify = false;
if (finalTableName.contains("/")) {
modifiedTableName = modifyTableNameIfNecessary(finalTableName);
isModify = !modifiedTableName.equals(finalTableName);
if (remoteTableName.contains("/")) {
modifiedTableName = modifyTableNameIfNecessary(remoteTableName);
isModify = !modifiedTableName.equals(remoteTableName);
if (isModify) {
rs = getColumns(databaseMetaData, catalogName, finalDbName, modifiedTableName);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, modifiedTableName);
} else {
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);
}
} else {
rs = getColumns(databaseMetaData, catalogName, finalDbName, finalTableName);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);
}
while (rs.next()) {
if (isModify && isTableModified(rs.getString("TABLE_NAME"), finalTableName)) {
if (isModify && isTableModified(rs.getString("TABLE_NAME"), remoteTableName)) {
continue;
}
lowerColumnToRealColumn.putIfAbsent(finalDbName, new ConcurrentHashMap<>());
lowerColumnToRealColumn.get(finalDbName).putIfAbsent(finalTableName, new ConcurrentHashMap<>());
JdbcFieldSchema field = new JdbcFieldSchema();
String columnName = rs.getString("COLUMN_NAME");
if (isLowerCaseTableNames) {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName)
.put(columnName.toLowerCase(), columnName);
columnName = columnName.toLowerCase();
} else {
lowerColumnToRealColumn.get(finalDbName).get(finalTableName).put(columnName, columnName);
}
field.setColumnName(columnName);
field.setColumnName(rs.getString("COLUMN_NAME"));
field.setDataType(rs.getInt("DATA_TYPE"));
field.setDataTypeName(rs.getString("TYPE_NAME"));
/*
@ -149,7 +116,7 @@ public class JdbcOracleClient extends JdbcClient {
tableSchema.add(field);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get table name list from jdbc for table %s:%s", e, finalTableName,
throw new JdbcClientException("failed to get table name list from jdbc for table %s:%s", e, remoteTableName,
Util.getRootCauseMessage(e));
} finally {
close(rs, conn);
@ -158,8 +125,8 @@ public class JdbcOracleClient extends JdbcClient {
}
@Override
protected String modifyTableNameIfNecessary(String tableName) {
return tableName.replace("/", "%");
protected String modifyTableNameIfNecessary(String remoteTableName) {
return remoteTableName.replace("/", "%");
}
@Override

View File

@ -0,0 +1,299 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.mapping;
import org.apache.doris.catalog.Column;
import org.apache.doris.qe.GlobalVariable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public abstract class IdentifierMapping {
private final ObjectMapper mapper = new ObjectMapper();
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> localTableToRemoteTable
= new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, String>>>
localColumnToRemoteColumn = new ConcurrentHashMap<>();
private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicBoolean>> columnNamesLoadedMap
= new ConcurrentHashMap<>();
private final boolean isLowerCaseMetaNames;
private final String metaNamesMapping;
public IdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) {
this.isLowerCaseMetaNames = isLowerCaseMetaNames;
this.metaNamesMapping = metaNamesMapping;
}
public List<String> setDatabaseNameMapping(List<String> remoteDatabaseNames) {
JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases");
Map<String, String> databaseNameMapping = Maps.newTreeMap();
if (databasesNode.isArray()) {
for (JsonNode node : databasesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String mapping = node.path("mapping").asText();
databaseNameMapping.put(remoteDatabase, mapping);
}
}
Map<String, List<String>> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
databaseNameMapping, isLowerCaseMetaNames);
List<String> localDatabaseNames = result.get("localNames");
List<String> conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the names.");
}
return localDatabaseNames;
}
public List<String> setTableNameMapping(String remoteDbName, List<String> remoteTableNames) {
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
Map<String, String> tableNameMapping = Maps.newTreeMap();
if (tablesNode.isArray()) {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
if (remoteDbName.equals(remoteDatabase)) {
String remoteTable = node.path("remoteTable").asText();
String mapping = node.path("mapping").asText();
tableNameMapping.put(remoteTable, mapping);
}
}
}
localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>());
List<String> localTableNames;
List<String> conflictNames;
if (GlobalVariable.lowerCaseTableNames == 1) {
Map<String, List<String>> result = nameListToMapping(remoteTableNames,
localTableToRemoteTable.get(remoteDbName),
tableNameMapping, true);
localTableNames = result.get("localNames");
conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict table names found in remote database/schema: " + remoteDbName
+ " when lower_case_table_names is 1: " + conflictNames
+ ". Please use meta_name_mapping to specify the names.");
}
} else {
Map<String, List<String>> result = nameListToMapping(remoteTableNames,
localTableToRemoteTable.get(remoteDbName),
tableNameMapping, isLowerCaseMetaNames);
localTableNames = result.get("localNames");
conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict table names found in remote database/schema: " + remoteDbName
+ "when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the table names.");
}
}
return localTableNames;
}
public List<Column> setColumnNameMapping(String remoteDbName, String remoteTableName, List<Column> remoteColumns) {
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
Map<String, String> columnNameMapping = Maps.newTreeMap();
if (tablesNode.isArray()) {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String remoteTable = node.path("remoteTable").asText();
if (remoteDbName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) {
String remoteColumn = node.path("remoteColumn").asText();
String mapping = node.path("mapping").asText();
columnNameMapping.put(remoteColumn, mapping);
}
}
}
localColumnToRemoteColumn.putIfAbsent(remoteDbName, new ConcurrentHashMap<>());
localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>());
List<String> localColumnNames;
List<String> conflictNames;
// Get the name from localColumns and save it to List<String>
List<String> remoteColumnNames = Lists.newArrayList();
for (Column remoteColumn : remoteColumns) {
remoteColumnNames.add(remoteColumn.getName());
}
Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName),
columnNameMapping, isLowerCaseMetaNames);
localColumnNames = result.get("localNames");
conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict column names found in remote database/schema: " + remoteDbName
+ " in remote table: " + remoteTableName
+ " when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the column names.");
}
// Replace the name in remoteColumns with localColumnNames
for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}
return remoteColumns;
}
public String getRemoteDatabaseName(String localDbName) {
if (localDBToRemoteDB.isEmpty() || !localDBToRemoteDB.containsKey(localDbName)) {
loadDatabaseNamesIfNeeded();
}
return localDBToRemoteDB.get(localDbName);
}
public String getRemoteTableName(String localDbName, String localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
if (localTableToRemoteTable.isEmpty()
|| !localTableToRemoteTable.containsKey(remoteDbName)
|| localTableToRemoteTable.get(remoteDbName) == null
|| localTableToRemoteTable.get(remoteDbName).isEmpty()
|| !localTableToRemoteTable.get(remoteDbName).containsKey(localTableName)
|| localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) {
loadTableNamesIfNeeded(localDbName);
}
return localTableToRemoteTable.get(remoteDbName).get(localTableName);
}
public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
if (localColumnToRemoteColumn.isEmpty()
|| !localColumnToRemoteColumn.containsKey(remoteDbName)
|| localColumnToRemoteColumn.get(remoteDbName) == null
|| localColumnToRemoteColumn.get(remoteDbName).isEmpty()
|| !localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName)
|| localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null
|| localColumnToRemoteColumn.get(
remoteDbName).get(remoteTableName).isEmpty()) {
loadColumnNamesIfNeeded(localDbName, localTableName);
}
return localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName);
}
private void loadDatabaseNamesIfNeeded() {
if (dbNamesLoaded.compareAndSet(false, true)) {
loadDatabaseNames();
}
}
private void loadTableNamesIfNeeded(String localDbName) {
AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
loadTableNames(localDbName);
}
}
private void loadColumnNamesIfNeeded(String localDbName, String localTableName) {
columnNamesLoadedMap.putIfAbsent(localDbName, new ConcurrentHashMap<>());
AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
.computeIfAbsent(localTableName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
loadColumnNames(localDbName, localTableName);
}
}
// Load the database name from the data source.
// In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping.
protected abstract void loadDatabaseNames();
// Load the table names for the specified database from the data source.
// In the corresponding getTableNameList(), setTableNameMapping() must be used to update the mapping.
protected abstract void loadTableNames(String localDbName);
// Load the column names for a specified table in a database from the data source.
// In the corresponding getColumnNameList(), setColumnNameMapping() must be used to update the mapping.
protected abstract void loadColumnNames(String localDbName, String localTableName);
private JsonNode readAndParseJson(String jsonPath, String nodeName) {
JsonNode rootNode;
try {
rootNode = mapper.readTree(jsonPath);
return rootNode.path(nodeName);
} catch (JsonProcessingException e) {
throw new RuntimeException("parse meta_names_mapping property error", e);
}
}
private Map<String, List<String>> nameListToMapping(List<String> remoteNames,
ConcurrentHashMap<String, String> localNameToRemoteName,
Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
List<String> filteredDatabaseNames = Lists.newArrayList();
Set<String> lowerCaseNames = Sets.newHashSet();
Map<String, List<String>> nameMap = Maps.newHashMap();
List<String> conflictNames = Lists.newArrayList();
for (String name : remoteNames) {
String mappedName = nameMapping.getOrDefault(name, name);
String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName;
// Use computeIfAbsent to ensure atomicity
localNameToRemoteName.computeIfAbsent(localName, k -> name);
if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
if (nameMap.containsKey(localName)) {
nameMap.get(localName).add(mappedName);
}
} else {
nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName)));
}
filteredDatabaseNames.add(localName);
}
for (List<String> conflictNameList : nameMap.values()) {
if (conflictNameList.size() > 1) {
conflictNames.addAll(conflictNameList);
}
}
Map<String, List<String>> result = Maps.newConcurrentMap();
result.put("localNames", filteredDatabaseNames);
result.put("conflictNames", conflictNames);
return result;
}
}

View File

@ -81,7 +81,7 @@ public class JdbcScanNode extends ExternalScanNode {
tbl = (JdbcTable) (desc.getTable());
}
jdbcType = tbl.getJdbcTableType();
tableName = tbl.getProperRealFullTableName(jdbcType);
tableName = tbl.getProperRemoteFullTableName(jdbcType);
}
@Override
@ -131,7 +131,7 @@ public class JdbcScanNode extends ExternalScanNode {
for (SlotRef slotRef : slotRefs) {
SlotRef slotRef1 = (SlotRef) slotRef.clone();
slotRef1.setTblName(null);
slotRef1.setLabel(JdbcTable.properNameWithRealName(jdbcType, slotRef1.getColumnName()));
slotRef1.setLabel(JdbcTable.properNameWithRemoteName(jdbcType, slotRef1.getColumnName()));
sMap.put(slotRef, slotRef1);
}
@ -171,7 +171,7 @@ public class JdbcScanNode extends ExternalScanNode {
continue;
}
Column col = slot.getColumn();
columns.add(tbl.getProperRealColumnName(jdbcType, col.getName()));
columns.add(tbl.getProperRemoteColumnName(jdbcType, col.getName()));
}
if (columns.isEmpty()) {
columns.add("*");

View File

@ -48,7 +48,7 @@ public class JdbcTableSink extends DataSink {
public JdbcTableSink(JdbcTable jdbcTable, List<String> insertCols) {
this.jdbcTable = jdbcTable;
jdbcType = jdbcTable.getJdbcTableType();
externalTableName = jdbcTable.getProperRealFullTableName(jdbcType);
externalTableName = jdbcTable.getProperRemoteFullTableName(jdbcType);
useTransaction = ConnectContext.get().getSessionVariable().isEnableOdbcTransaction();
dorisTableName = jdbcTable.getName();
insertSql = jdbcTable.getInsertSql(insertCols);