[Enhancement](external catalog) Added status reset when jdbc name mapping is abnormal (#34129)
This commit is contained in:
@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
public abstract class IdentifierMapping {
|
||||
private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class);
|
||||
|
||||
private final ObjectMapper mapper = new ObjectMapper();
|
||||
private final ConcurrentHashMap<String, String> localDBToRemoteDB = new ConcurrentHashMap<>();
|
||||
@ -179,51 +182,59 @@ public abstract class IdentifierMapping {
|
||||
}
|
||||
|
||||
public String getRemoteDatabaseName(String localDbName) {
|
||||
if (localDBToRemoteDB.isEmpty() || !localDBToRemoteDB.containsKey(localDbName)) {
|
||||
loadDatabaseNamesIfNeeded();
|
||||
}
|
||||
return localDBToRemoteDB.get(localDbName);
|
||||
return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded,
|
||||
localDbName);
|
||||
}
|
||||
|
||||
public String getRemoteTableName(String localDbName, String localTableName) {
|
||||
String remoteDbName = getRemoteDatabaseName(localDbName);
|
||||
if (localTableToRemoteTable.isEmpty()
|
||||
|| !localTableToRemoteTable.containsKey(remoteDbName)
|
||||
|| localTableToRemoteTable.get(remoteDbName) == null
|
||||
|| localTableToRemoteTable.get(remoteDbName).isEmpty()
|
||||
|| !localTableToRemoteTable.get(remoteDbName).containsKey(localTableName)
|
||||
|| localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) {
|
||||
loadTableNamesIfNeeded(localDbName);
|
||||
}
|
||||
|
||||
return localTableToRemoteTable.get(remoteDbName).get(localTableName);
|
||||
Map<String, String> tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName,
|
||||
k -> new ConcurrentHashMap<>());
|
||||
return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName),
|
||||
localTableName);
|
||||
}
|
||||
|
||||
public Map<String, String> getRemoteColumnNames(String localDbName, String localTableName) {
|
||||
String remoteDbName = getRemoteDatabaseName(localDbName);
|
||||
String remoteTableName = getRemoteTableName(localDbName, localTableName);
|
||||
if (localColumnToRemoteColumn.isEmpty()
|
||||
|| !localColumnToRemoteColumn.containsKey(remoteDbName)
|
||||
|| localColumnToRemoteColumn.get(remoteDbName) == null
|
||||
|| localColumnToRemoteColumn.get(remoteDbName).isEmpty()
|
||||
|| !localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName)
|
||||
|| localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null
|
||||
|| localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName).isEmpty()) {
|
||||
ConcurrentHashMap<String, ConcurrentHashMap<String, String>> tableColumnMap
|
||||
= localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>());
|
||||
Map<String, String> columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>());
|
||||
if (columnMap.isEmpty()) {
|
||||
LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}",
|
||||
localDbName, localTableName);
|
||||
loadColumnNamesIfNeeded(localDbName, localTableName);
|
||||
columnMap = tableColumnMap.get(remoteTableName);
|
||||
}
|
||||
return localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName);
|
||||
if (columnMap.isEmpty()) {
|
||||
LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName);
|
||||
throw new RuntimeException(
|
||||
"No remote column found for localTableName: " + localTableName + ". Please refresh this catalog.");
|
||||
}
|
||||
return columnMap;
|
||||
}
|
||||
|
||||
|
||||
private void loadDatabaseNamesIfNeeded() {
|
||||
if (dbNamesLoaded.compareAndSet(false, true)) {
|
||||
loadDatabaseNames();
|
||||
try {
|
||||
loadDatabaseNames();
|
||||
} catch (Exception e) {
|
||||
dbNamesLoaded.set(false); // Reset on failure
|
||||
LOG.warn("Error loading database names", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void loadTableNamesIfNeeded(String localDbName) {
|
||||
AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false));
|
||||
if (isLoaded.compareAndSet(false, true)) {
|
||||
loadTableNames(localDbName);
|
||||
try {
|
||||
loadTableNames(localDbName);
|
||||
} catch (Exception e) {
|
||||
tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure
|
||||
LOG.warn("Error loading table names for localDbName: {}", localDbName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -232,10 +243,31 @@ public abstract class IdentifierMapping {
|
||||
AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName)
|
||||
.computeIfAbsent(localTableName, k -> new AtomicBoolean(false));
|
||||
if (isLoaded.compareAndSet(false, true)) {
|
||||
loadColumnNames(localDbName, localTableName);
|
||||
try {
|
||||
loadColumnNames(localDbName, localTableName);
|
||||
} catch (Exception e) {
|
||||
columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure
|
||||
LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName,
|
||||
localTableName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private <K, V> V getRequiredMapping(Map<K, V> map, K key, String typeName, Runnable loadIfNeeded,
|
||||
String entityName) {
|
||||
if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) {
|
||||
LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName);
|
||||
loadIfNeeded.run();
|
||||
}
|
||||
V value = map.get(key);
|
||||
if (value == null) {
|
||||
LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName);
|
||||
throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName
|
||||
+ ". Please refresh this catalog.");
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
// Load the database name from the data source.
|
||||
// In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping.
|
||||
protected abstract void loadDatabaseNames();
|
||||
|
||||
Reference in New Issue
Block a user