Revert "[branch-2.1][improvement](jdbc catalog) Optimize JdbcCatalog case mapping stability" (#41588)

Reverts apache/doris#41330
This commit is contained in:
zy-kkk
2024-10-09 20:51:58 +08:00
committed by GitHub
parent e218fd2314
commit ade86c0600
10 changed files with 409 additions and 371 deletions

View File

@ -45,7 +45,6 @@ import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.mapping.IdentifierMapping;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
@ -143,9 +142,6 @@ public abstract class ExternalCatalog
protected Optional<Boolean> useMetaCache = Optional.empty();
protected MetaCache<ExternalDatabase<? extends ExternalTable>> metaCache;
protected IdentifierMapping identifierMapping;
private boolean mappingsInitialized = false;
public ExternalCatalog() {
}
@ -178,10 +174,6 @@ public abstract class ExternalCatalog
}
}
// only for forward to master
protected void buildDatabaseMapping() {
}
// Will be called when creating catalog(so when as replaying)
// to add some default properties if missing.
public void setDefaultPropsIfMissing(boolean isReplay) {
@ -210,10 +202,6 @@ public abstract class ExternalCatalog
*/
public abstract List<String> listTableNames(SessionContext ctx, String dbName);
// only for forward to master
protected void buildTableMapping(SessionContext ctx, String dbName) {
}
/**
* check if the specified table exist.
*
@ -278,10 +266,6 @@ public abstract class ExternalCatalog
}
initialized = true;
}
if (!mappingsInitialized) {
buildDatabaseMapping();
mappingsInitialized = true;
}
}
protected final void initLocalObjects() {
@ -407,7 +391,6 @@ public abstract class ExternalCatalog
public void onRefresh(boolean invalidCache) {
this.objectCreated = false;
this.initialized = false;
this.mappingsInitialized = false;
synchronized (this.propLock) {
this.convertedProperties = null;
}
@ -733,7 +716,6 @@ public abstract class ExternalCatalog
}
this.propLock = new byte[0];
this.initialized = false;
this.mappingsInitialized = false;
setDefaultPropsIfMissing(true);
}

View File

@ -91,8 +91,6 @@ public abstract class ExternalDatabase<T extends ExternalTable>
private MetaCache<T> metaCache;
private boolean mappingsInitialized = false;
/**
* Create external database.
*
@ -119,7 +117,6 @@ public abstract class ExternalDatabase<T extends ExternalTable>
public void setUnInitialized(boolean invalidCache) {
this.initialized = false;
this.mappingsInitialized = false;
this.invalidCacheInInit = invalidCache;
if (extCatalog.getUseMetaCache().isPresent()) {
if (extCatalog.getUseMetaCache().get() && metaCache != null) {
@ -173,10 +170,6 @@ public abstract class ExternalDatabase<T extends ExternalTable>
}
initialized = true;
}
if (!mappingsInitialized) {
extCatalog.buildTableMapping(null, name);
mappingsInitialized = true;
}
}
public void replayInitDb(InitDatabaseLog log, ExternalCatalog catalog) {

View File

@ -31,7 +31,6 @@ import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.datasource.mapping.DefaultIdentifierMapping;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
@ -119,16 +118,19 @@ public class JdbcExternalCatalog extends ExternalCatalog {
super.onRefresh(invalidCache);
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
}
@Override
public void onRefreshCache(boolean invalidCache) {
onRefresh(invalidCache);
}
@Override
public void onClose() {
super.onClose();
if (jdbcClient != null) {
jdbcClient.closeClient();
jdbcClient = null;
}
}
@ -229,6 +231,8 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.setDriverUrl(getDriverUrl())
.setDriverClass(getDriverClass())
.setOnlySpecifiedDatabase(getOnlySpecifiedDatabase())
.setIsLowerCaseMetaNames(getLowerCaseMetaNames())
.setMetaNamesMapping(getMetaNamesMapping())
.setIncludeDatabaseMap(getIncludeDatabaseMap())
.setExcludeDatabaseMap(getExcludeDatabaseMap())
.setConnectionPoolMinSize(getConnectionPoolMinSize())
@ -238,62 +242,22 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.setConnectionPoolKeepAlive(isConnectionPoolKeepAlive());
jdbcClient = JdbcClient.createJdbcClient(jdbcClientConfig);
identifierMapping = new DefaultIdentifierMapping(Boolean.parseBoolean(getLowerCaseMetaNames()),
getMetaNamesMapping());
}
@Override
protected List<String> listDatabaseNames() {
return identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
}
@Override
protected void buildDatabaseMapping() {
identifierMapping.fromRemoteDatabaseName(jdbcClient.getDatabaseNameList());
}
protected String getRemoteDatabaseName(String dbName) {
return identifierMapping.toRemoteDatabaseName(dbName);
return jdbcClient.getDatabaseNameList();
}
@Override
public List<String> listTableNames(SessionContext ctx, String dbName) {
makeSureInitialized();
String remoteDbName = getRemoteDatabaseName(dbName);
return identifierMapping.fromRemoteTableName(remoteDbName, jdbcClient.getTablesNameList(remoteDbName));
}
@Override
protected void buildTableMapping(SessionContext ctx, String dbName) {
String remoteDbName = getRemoteDatabaseName(dbName);
identifierMapping.fromRemoteTableName(getRemoteDatabaseName(dbName),
jdbcClient.getTablesNameList(remoteDbName));
}
protected String getRemoteTableName(String dbName, String tblName) {
return identifierMapping.toRemoteTableName(getRemoteDatabaseName(dbName), tblName);
return jdbcClient.getTablesNameList(dbName);
}
@Override
public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
makeSureInitialized();
String remoteDbName = getRemoteDatabaseName(dbName);
String remoteTblName = getRemoteTableName(dbName, tblName);
return jdbcClient.isTableExist(remoteDbName, remoteTblName);
}
public List<Column> listColumns(String dbName, String tblName) {
makeSureInitialized();
String remoteDbName = getRemoteDatabaseName(dbName);
String remoteTblName = getRemoteTableName(dbName, tblName);
return identifierMapping.fromRemoteColumnName(remoteDbName, remoteTblName,
jdbcClient.getColumnsFromJdbc(remoteDbName,
remoteTblName));
}
protected Map<String, String> getRemoteColumnNames(String dbName, String tblName) {
return identifierMapping.toRemoteColumnNames(getRemoteDatabaseName(dbName),
getRemoteTableName(dbName, tblName));
return jdbcClient.isTableExist(dbName, tblName);
}
@Override

View File

@ -32,7 +32,6 @@ import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;
import com.google.common.collect.Maps;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -87,29 +86,21 @@ public class JdbcExternalTable extends ExternalTable {
@Override
public Optional<SchemaCacheValue> initSchema() {
return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).listColumns(dbName, name)));
return Optional.of(new SchemaCacheValue(((JdbcExternalCatalog) catalog).getJdbcClient()
.getColumnsFromJdbc(dbName, name)));
}
private JdbcTable toJdbcTable() {
List<Column> schema = getFullSchema();
JdbcExternalCatalog jdbcCatalog = (JdbcExternalCatalog) catalog;
String fullTableName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullTableName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcCatalog.configureJdbcTable(jdbcTable, fullTableName);
String fullDbName = this.dbName + "." + this.name;
JdbcTable jdbcTable = new JdbcTable(this.id, fullDbName, schema, TableType.JDBC_EXTERNAL_TABLE);
jdbcCatalog.configureJdbcTable(jdbcTable, fullDbName);
// Set remote properties
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(jdbcCatalog.getRemoteTableName(this.dbName, this.name));
Map<String, String> remoteColumnNames = jdbcCatalog.getRemoteColumnNames(this.dbName, this.name);
if (!remoteColumnNames.isEmpty()) {
jdbcTable.setRemoteColumnNames(remoteColumnNames);
} else {
remoteColumnNames = Maps.newHashMap();
for (Column column : schema) {
remoteColumnNames.put(column.getName(), column.getName());
}
jdbcTable.setRemoteColumnNames(remoteColumnNames);
}
jdbcTable.setRemoteDatabaseName(jdbcCatalog.getJdbcClient().getRemoteDatabaseName(this.dbName));
jdbcTable.setRemoteTableName(jdbcCatalog.getJdbcClient().getRemoteTableName(this.dbName, this.name));
jdbcTable.setRemoteColumnNames(jdbcCatalog.getJdbcClient().getRemoteColumnNames(this.dbName, this.name));
return jdbcTable;
}

View File

@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.jdbc;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.mapping.IdentifierMapping;
public class JdbcIdentifierMapping extends IdentifierMapping {
private final JdbcClient jdbcClient;
public JdbcIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping, JdbcClient jdbcClient) {
super(isLowerCaseMetaNames, metaNamesMapping);
this.jdbcClient = jdbcClient;
}
@Override
protected void loadDatabaseNames() {
jdbcClient.getDatabaseNameList();
}
@Override
protected void loadTableNames(String localDbName) {
jdbcClient.getTablesNameList(localDbName);
}
@Override
protected void loadColumnNames(String localDbName, String localTableName) {
jdbcClient.getColumnsFromJdbc(localDbName, localTableName);
}
}

View File

@ -23,6 +23,7 @@ import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.JdbcIdentifierMapping;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;
import com.google.common.collect.ImmutableSet;
@ -61,8 +62,11 @@ public abstract class JdbcClient {
protected ClassLoader classLoader = null;
protected HikariDataSource dataSource = null;
protected boolean isOnlySpecifiedDatabase;
protected boolean isLowerCaseMetaNames;
protected String metaNamesMapping;
protected Map<String, Boolean> includeDatabaseMap;
protected Map<String, Boolean> excludeDatabaseMap;
protected JdbcIdentifierMapping jdbcLowerCaseMetaMatching;
public static JdbcClient createJdbcClient(JdbcClientConfig jdbcClientConfig) {
String dbType = parseDbType(jdbcClientConfig.getJdbcUrl());
@ -97,6 +101,8 @@ public abstract class JdbcClient {
this.catalogName = jdbcClientConfig.getCatalog();
this.jdbcUser = jdbcClientConfig.getUser();
this.isOnlySpecifiedDatabase = Boolean.parseBoolean(jdbcClientConfig.getOnlySpecifiedDatabase());
this.isLowerCaseMetaNames = Boolean.parseBoolean(jdbcClientConfig.getIsLowerCaseMetaNames());
this.metaNamesMapping = jdbcClientConfig.getMetaNamesMapping();
this.includeDatabaseMap =
Optional.ofNullable(jdbcClientConfig.getIncludeDatabaseMap()).orElse(Collections.emptyMap());
this.excludeDatabaseMap =
@ -105,6 +111,7 @@ public abstract class JdbcClient {
this.dbType = parseDbType(jdbcUrl);
initializeClassLoader(jdbcClientConfig);
initializeDataSource(jdbcClientConfig);
this.jdbcLowerCaseMetaMatching = new JdbcIdentifierMapping(isLowerCaseMetaNames, metaNamesMapping, this);
}
// Initialize DataSource
@ -287,9 +294,10 @@ public abstract class JdbcClient {
/**
* get all tables of one database
*/
public List<String> getTablesNameList(String remoteDbName) {
public List<String> getTablesNameList(String localDbName) {
List<String> remoteTablesNames = Lists.newArrayList();
String[] tableTypes = getTableTypes();
String remoteDbName = getRemoteDatabaseName(localDbName);
processTable(remoteDbName, null, tableTypes, (rs) -> {
try {
while (rs.next()) {
@ -299,12 +307,14 @@ public abstract class JdbcClient {
throw new JdbcClientException("failed to get all tables for remote database: `%s`", e, remoteDbName);
}
});
return remoteTablesNames;
return filterTableNames(remoteDbName, remoteTablesNames);
}
public boolean isTableExist(String remoteDbName, String remoteTableName) {
public boolean isTableExist(String localDbName, String localTableName) {
final boolean[] isExist = {false};
String[] tableTypes = getTableTypes();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
processTable(remoteDbName, remoteTableName, tableTypes, (rs) -> {
try {
if (rs.next()) {
@ -321,10 +331,12 @@ public abstract class JdbcClient {
/**
* get all columns of one table
*/
public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remoteTableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
@ -350,7 +362,21 @@ public abstract class JdbcClient {
field.isAllowNull(), field.getRemarks(),
true, -1));
}
return dorisTableSchema;
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
return filterColumnName(remoteDbName, remoteTableName, dorisTableSchema);
}
public String getRemoteDatabaseName(String localDbname) {
return jdbcLowerCaseMetaMatching.getRemoteDatabaseName(localDbname);
}
public String getRemoteTableName(String localDbName, String localTableName) {
return jdbcLowerCaseMetaMatching.getRemoteTableName(localDbName, localTableName);
}
public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
return jdbcLowerCaseMetaMatching.getRemoteColumnNames(localDbName, localTableName);
}
// protected methods,for subclass to override
@ -408,7 +434,7 @@ public abstract class JdbcClient {
}
filteredDatabaseNames.add(databaseName);
}
return filteredDatabaseNames;
return jdbcLowerCaseMetaMatching.setDatabaseNameMapping(filteredDatabaseNames);
}
protected Set<String> getFilterInternalDatabases() {
@ -419,6 +445,14 @@ public abstract class JdbcClient {
.build();
}
protected List<String> filterTableNames(String remoteDbName, List<String> remoteTableNames) {
return jdbcLowerCaseMetaMatching.setTableNameMapping(remoteDbName, remoteTableNames);
}
protected List<Column> filterColumnName(String remoteDbName, String remoteTableName, List<Column> remoteColumns) {
return jdbcLowerCaseMetaMatching.setColumnNameMapping(remoteDbName, remoteTableName, remoteColumns);
}
protected abstract Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema);
protected Type createDecimalOrStringType(int precision, int scale) {

View File

@ -129,10 +129,12 @@ public class JdbcMySQLClient extends JdbcClient {
* get all columns of one table
*/
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remoteTableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);

View File

@ -49,10 +49,12 @@ public class JdbcOracleClient extends JdbcClient {
}
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String remoteDbName, String remoteTableName) {
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);

View File

@ -1,268 +0,0 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.mapping;
import org.apache.doris.catalog.Column;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class DefaultIdentifierMapping implements IdentifierMapping {
private static final Logger LOG = LogManager.getLogger(DefaultIdentifierMapping.class);
private final ObjectMapper mapper = new ObjectMapper();
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> localTableToRemoteTable
= new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, String>>>
localColumnToRemoteColumn = new ConcurrentHashMap<>();
private final boolean isLowerCaseMetaNames;
private final String metaNamesMapping;
public DefaultIdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) {
this.isLowerCaseMetaNames = isLowerCaseMetaNames;
this.metaNamesMapping = metaNamesMapping;
}
private boolean isMappingInvalid() {
return metaNamesMapping == null || metaNamesMapping.isEmpty();
}
@Override
public List<String> fromRemoteDatabaseName(List<String> remoteDatabaseNames) {
// If mapping is not required, return the original input
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return remoteDatabaseNames;
}
JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases");
Map<String, String> databaseNameMapping = Maps.newTreeMap();
if (databasesNode.isArray()) {
for (JsonNode node : databasesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String mapping = node.path("mapping").asText();
databaseNameMapping.put(remoteDatabase, mapping);
}
}
Map<String, List<String>> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
databaseNameMapping, isLowerCaseMetaNames);
List<String> localDatabaseNames = result.get("localNames");
List<String> conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the names.");
}
return localDatabaseNames;
}
@Override
public List<String> fromRemoteTableName(String remoteDbName, List<String> remoteTableNames) {
// If mapping is not required, return the original input
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return remoteTableNames;
}
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
Map<String, String> tableNameMapping = Maps.newTreeMap();
if (tablesNode.isArray()) {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
if (remoteDbName.equals(remoteDatabase)) {
String remoteTable = node.path("remoteTable").asText();
String mapping = node.path("mapping").asText();
tableNameMapping.put(remoteTable, mapping);
}
}
}
localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>());
Map<String, List<String>> result = nameListToMapping(remoteTableNames,
localTableToRemoteTable.get(remoteDbName),
tableNameMapping, isLowerCaseMetaNames);
List<String> localTableNames = result.get("localNames");
List<String> conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict table names found in remote database/schema: " + remoteDbName
+ " when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the table names.");
}
return localTableNames;
}
@Override
public List<Column> fromRemoteColumnName(String remoteDatabaseName, String remoteTableName,
List<Column> remoteColumns) {
// If mapping is not required, return the original input
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return remoteColumns;
}
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
Map<String, String> columnNameMapping = Maps.newTreeMap();
if (tablesNode.isArray()) {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String remoteTable = node.path("remoteTable").asText();
if (remoteDatabaseName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) {
String remoteColumn = node.path("remoteColumn").asText();
String mapping = node.path("mapping").asText();
columnNameMapping.put(remoteColumn, mapping);
}
}
}
localColumnToRemoteColumn.putIfAbsent(remoteDatabaseName, new ConcurrentHashMap<>());
localColumnToRemoteColumn.get(remoteDatabaseName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>());
List<String> remoteColumnNames = Lists.newArrayList();
for (Column remoteColumn : remoteColumns) {
remoteColumnNames.add(remoteColumn.getName());
}
Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
localColumnToRemoteColumn.get(remoteDatabaseName).get(remoteTableName),
columnNameMapping, isLowerCaseMetaNames);
List<String> localColumnNames = result.get("localNames");
List<String> conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict column names found in remote database/schema: " + remoteDatabaseName
+ " in remote table: " + remoteTableName
+ " when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the column names.");
}
for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}
return remoteColumns;
}
@Override
public String toRemoteDatabaseName(String localDatabaseName) {
// If mapping is not required, return the original input
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return localDatabaseName;
}
return getRequiredMapping(localDBToRemoteDB, localDatabaseName, "database", localDatabaseName);
}
@Override
public String toRemoteTableName(String remoteDatabaseName, String localTableName) {
// If mapping is not required, return the original input
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return localTableName;
}
Map<String, String> tableMap = localTableToRemoteTable.computeIfAbsent(remoteDatabaseName,
k -> new ConcurrentHashMap<>());
return getRequiredMapping(tableMap, localTableName, "table", localTableName);
}
@Override
public Map<String, String> toRemoteColumnNames(String remoteDatabaseName, String remoteTableName) {
// If mapping is not required, return an empty map (since there's no mapping)
if (!isLowerCaseMetaNames && isMappingInvalid()) {
return Collections.emptyMap();
}
ConcurrentHashMap<String, ConcurrentHashMap<String, String>> tableColumnMap
= localColumnToRemoteColumn.computeIfAbsent(remoteDatabaseName, k -> new ConcurrentHashMap<>());
Map<String, String> columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
if (columnMap.isEmpty()) {
LOG.warn("No remote column found for: {}. Please refresh this catalog.", remoteTableName);
throw new RuntimeException(
"No remote column found for: " + remoteTableName + ". Please refresh this catalog.");
}
return columnMap;
}
private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, String entityName) {
V value = map.get(key);
if (value == null) {
LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName);
throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName
+ ". Please refresh this catalog.");
}
return value;
}
private JsonNode readAndParseJson(String jsonPath, String nodeName) {
JsonNode rootNode;
try {
rootNode = mapper.readTree(jsonPath);
return rootNode.path(nodeName);
} catch (JsonProcessingException e) {
throw new RuntimeException("parse meta_names_mapping property error", e);
}
}
private Map<String, List<String>> nameListToMapping(List<String> remoteNames,
ConcurrentHashMap<String, String> localNameToRemoteName,
Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
List<String> filteredDatabaseNames = Lists.newArrayList();
Set<String> lowerCaseNames = Sets.newHashSet();
Map<String, List<String>> nameMap = Maps.newHashMap();
List<String> conflictNames = Lists.newArrayList();
for (String name : remoteNames) {
String mappedName = nameMapping.getOrDefault(name, name);
String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName;
localNameToRemoteName.computeIfAbsent(localName, k -> name);
if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
if (nameMap.containsKey(localName)) {
nameMap.get(localName).add(mappedName);
}
} else {
nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName)));
}
filteredDatabaseNames.add(localName);
}
for (List<String> conflictNameList : nameMap.values()) {
if (conflictNameList.size() > 1) {
conflictNames.addAll(conflictNameList);
}
}
Map<String, List<String>> result = Maps.newConcurrentMap();
result.put("localNames", filteredDatabaseNames);
result.put("conflictNames", conflictNames);
return result;
}
}

View File

@ -18,20 +18,313 @@
package org.apache.doris.datasource.mapping;
import org.apache.doris.catalog.Column;
import org.apache.doris.qe.GlobalVariable;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
public interface IdentifierMapping {
List<String> fromRemoteDatabaseName(List<String> remoteDatabaseNames);
public abstract class IdentifierMapping {
private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class);
List<String> fromRemoteTableName(String remoteDatabaseName, List<String> remoteTableNames);
private final ObjectMapper mapper = new ObjectMapper();
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> localTableToRemoteTable
= new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, ConcurrentHashMap<String, String>>>
localColumnToRemoteColumn = new ConcurrentHashMap<>();
List<Column> fromRemoteColumnName(String remoteDatabaseName, String remoteTableName, List<Column> remoteColumns);
private final AtomicBoolean dbNamesLoaded = new AtomicBoolean(false);
private final ConcurrentHashMap<String, AtomicBoolean> tableNamesLoadedMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, ConcurrentHashMap<String, AtomicBoolean>> columnNamesLoadedMap
= new ConcurrentHashMap<>();
String toRemoteDatabaseName(String localDatabaseName);
private final boolean isLowerCaseMetaNames;
private final String metaNamesMapping;
String toRemoteTableName(String remoteDatabaseName, String localTableName);
public IdentifierMapping(boolean isLowerCaseMetaNames, String metaNamesMapping) {
this.isLowerCaseMetaNames = isLowerCaseMetaNames;
this.metaNamesMapping = metaNamesMapping;
}
Map<String, String> toRemoteColumnNames(String remoteDatabaseName, String remoteTableName);
public List<String> setDatabaseNameMapping(List<String> remoteDatabaseNames) {
JsonNode databasesNode = readAndParseJson(metaNamesMapping, "databases");
Map<String, String> databaseNameMapping = Maps.newTreeMap();
if (databasesNode.isArray()) {
for (JsonNode node : databasesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String mapping = node.path("mapping").asText();
databaseNameMapping.put(remoteDatabase, mapping);
}
}
Map<String, List<String>> result = nameListToMapping(remoteDatabaseNames, localDBToRemoteDB,
databaseNameMapping, isLowerCaseMetaNames);
List<String> localDatabaseNames = result.get("localNames");
List<String> conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict database/schema names found when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the names.");
}
return localDatabaseNames;
}
public List<String> setTableNameMapping(String remoteDbName, List<String> remoteTableNames) {
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "tables");
Map<String, String> tableNameMapping = Maps.newTreeMap();
if (tablesNode.isArray()) {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
if (remoteDbName.equals(remoteDatabase)) {
String remoteTable = node.path("remoteTable").asText();
String mapping = node.path("mapping").asText();
tableNameMapping.put(remoteTable, mapping);
}
}
}
localTableToRemoteTable.putIfAbsent(remoteDbName, new ConcurrentHashMap<>());
List<String> localTableNames;
List<String> conflictNames;
if (GlobalVariable.lowerCaseTableNames == 1) {
Map<String, List<String>> result = nameListToMapping(remoteTableNames,
localTableToRemoteTable.get(remoteDbName),
tableNameMapping, true);
localTableNames = result.get("localNames");
conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict table names found in remote database/schema: " + remoteDbName
+ " when lower_case_table_names is 1: " + conflictNames
+ ". Please use meta_name_mapping to specify the names.");
}
} else {
Map<String, List<String>> result = nameListToMapping(remoteTableNames,
localTableToRemoteTable.get(remoteDbName),
tableNameMapping, isLowerCaseMetaNames);
localTableNames = result.get("localNames");
conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict table names found in remote database/schema: " + remoteDbName
+ "when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the table names.");
}
}
return localTableNames;
}
public List<Column> setColumnNameMapping(String remoteDbName, String remoteTableName, List<Column> remoteColumns) {
JsonNode tablesNode = readAndParseJson(metaNamesMapping, "columns");
Map<String, String> columnNameMapping = Maps.newTreeMap();
if (tablesNode.isArray()) {
for (JsonNode node : tablesNode) {
String remoteDatabase = node.path("remoteDatabase").asText();
String remoteTable = node.path("remoteTable").asText();
if (remoteDbName.equals(remoteDatabase) && remoteTable.equals(remoteTableName)) {
String remoteColumn = node.path("remoteColumn").asText();
String mapping = node.path("mapping").asText();
columnNameMapping.put(remoteColumn, mapping);
}
}
}
localColumnToRemoteColumn.putIfAbsent(remoteDbName, new ConcurrentHashMap<>());
localColumnToRemoteColumn.get(remoteDbName).putIfAbsent(remoteTableName, new ConcurrentHashMap<>());
List<String> localColumnNames;
List<String> conflictNames;
// Get the name from localColumns and save it to List<String>
List<String> remoteColumnNames = Lists.newArrayList();
for (Column remoteColumn : remoteColumns) {
remoteColumnNames.add(remoteColumn.getName());
}
Map<String, List<String>> result = nameListToMapping(remoteColumnNames,
localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName),
columnNameMapping, isLowerCaseMetaNames);
localColumnNames = result.get("localNames");
conflictNames = result.get("conflictNames");
if (!conflictNames.isEmpty()) {
throw new RuntimeException(
"Conflict column names found in remote database/schema: " + remoteDbName
+ " in remote table: " + remoteTableName
+ " when lower_case_meta_names is true: " + conflictNames
+ ". Please set lower_case_meta_names to false or"
+ " use meta_name_mapping to specify the column names.");
}
// Replace the name in remoteColumns with localColumnNames
for (int i = 0; i < remoteColumns.size(); i++) {
remoteColumns.get(i).setName(localColumnNames.get(i));
}
return remoteColumns;
}
public String getRemoteDatabaseName(String localDbName) {
return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded,
localDbName);
}
public String getRemoteTableName(String localDbName, String localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
Map<String, String> tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName,
k -> new ConcurrentHashMap<>());
return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName),
localTableName);
}
public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
ConcurrentHashMap<String, ConcurrentHashMap<String, String>> tableColumnMap
= localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>());
Map<String, String> columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
if (columnMap.isEmpty()) {
LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}",
localDbName, localTableName);
loadColumnNamesIfNeeded(localDbName, localTableName);
columnMap = tableColumnMap.get(remoteTableName);
}
if (columnMap.isEmpty()) {
LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName);
throw new RuntimeException(
"No remote column found for localTableName: " + localTableName + ". Please refresh this catalog.");
}
return columnMap;
}
private void loadDatabaseNamesIfNeeded() {
if (dbNamesLoaded.compareAndSet(false, true)) {
try {
loadDatabaseNames();
} catch (Exception e) {
dbNamesLoaded.set(false); // Reset on failure
LOG.warn("Error loading database names", e);
}
}
}
private void loadTableNamesIfNeeded(String localDbName) {
AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
try {
loadTableNames(localDbName);
} catch (Exception e) {
tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure
LOG.warn("Error loading table names for localDbName: {}", localDbName, e);
}
}
}
private void loadColumnNamesIfNeeded(String localDbName, String localTableName) {
columnNamesLoadedMap.putIfAbsent(localDbName, new ConcurrentHashMap<>());
AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
.computeIfAbsent(localTableName, k -> new AtomicBoolean(false));
if (isLoaded.compareAndSet(false, true)) {
try {
loadColumnNames(localDbName, localTableName);
} catch (Exception e) {
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure
LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName,
localTableName, e);
}
}
}
private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, Runnable loadIfNeeded,
String entityName) {
if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName);
loadIfNeeded.run();
}
V value = map.get(key);
if (value == null) {
LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName);
throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName
+ ". Please refresh this catalog.");
}
return value;
}
// Load the database name from the data source.
// In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping.
protected abstract void loadDatabaseNames();
// Load the table names for the specified database from the data source.
// In the corresponding getTableNameList(), setTableNameMapping() must be used to update the mapping.
protected abstract void loadTableNames(String localDbName);
// Load the column names for a specified table in a database from the data source.
// In the corresponding getColumnNameList(), setColumnNameMapping() must be used to update the mapping.
protected abstract void loadColumnNames(String localDbName, String localTableName);
private JsonNode readAndParseJson(String jsonPath, String nodeName) {
JsonNode rootNode;
try {
rootNode = mapper.readTree(jsonPath);
return rootNode.path(nodeName);
} catch (JsonProcessingException e) {
throw new RuntimeException("parse meta_names_mapping property error", e);
}
}
private Map<String, List<String>> nameListToMapping(List<String> remoteNames,
ConcurrentHashMap<String, String> localNameToRemoteName,
Map<String, String> nameMapping, boolean isLowerCaseMetaNames) {
List<String> filteredDatabaseNames = Lists.newArrayList();
Set<String> lowerCaseNames = Sets.newHashSet();
Map<String, List<String>> nameMap = Maps.newHashMap();
List<String> conflictNames = Lists.newArrayList();
for (String name : remoteNames) {
String mappedName = nameMapping.getOrDefault(name, name);
String localName = isLowerCaseMetaNames ? mappedName.toLowerCase() : mappedName;
// Use computeIfAbsent to ensure atomicity
localNameToRemoteName.computeIfAbsent(localName, k -> name);
if (isLowerCaseMetaNames && !lowerCaseNames.add(localName)) {
if (nameMap.containsKey(localName)) {
nameMap.get(localName).add(mappedName);
}
} else {
nameMap.putIfAbsent(localName, Lists.newArrayList(Collections.singletonList(mappedName)));
}
filteredDatabaseNames.add(localName);
}
for (List<String> conflictNameList : nameMap.values()) {
if (conflictNameList.size() > 1) {
conflictNames.addAll(conflictNameList);
}
}
Map<String, List<String>> result = Maps.newConcurrentMap();
result.put("localNames", filteredDatabaseNames);
result.put("conflictNames", conflictNames);
return result;
}
}