From 50e9982729397790fc4f5238cbe18b38348ecfd5 Mon Sep 17 00:00:00 2001 From: zy-kkk Date: Thu, 25 Apr 2024 20:40:25 +0800 Subject: [PATCH] [Enhancement](external catalog) Added status reset when jdbc name mapping is abnormal (#34129) --- .../datasource/mapping/IdentifierMapping.java | 82 +++++++++++++------ 1 file changed, 57 insertions(+), 25 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java index cd121f2b63..363ef35115 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/mapping/IdentifierMapping.java @@ -26,6 +26,8 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.List; @@ -35,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; public abstract class IdentifierMapping { + private static final Logger LOG = LogManager.getLogger(IdentifierMapping.class); private final ObjectMapper mapper = new ObjectMapper(); private final ConcurrentHashMap localDBToRemoteDB = new ConcurrentHashMap<>(); @@ -179,51 +182,59 @@ public abstract class IdentifierMapping { } public String getRemoteDatabaseName(String localDbName) { - if (localDBToRemoteDB.isEmpty() || !localDBToRemoteDB.containsKey(localDbName)) { - loadDatabaseNamesIfNeeded(); - } - return localDBToRemoteDB.get(localDbName); + return getRequiredMapping(localDBToRemoteDB, localDbName, "database", this::loadDatabaseNamesIfNeeded, + localDbName); } public String getRemoteTableName(String localDbName, String localTableName) { String remoteDbName = getRemoteDatabaseName(localDbName); - if (localTableToRemoteTable.isEmpty() - || !localTableToRemoteTable.containsKey(remoteDbName) - || localTableToRemoteTable.get(remoteDbName) == null - || localTableToRemoteTable.get(remoteDbName).isEmpty() - || !localTableToRemoteTable.get(remoteDbName).containsKey(localTableName) - || localTableToRemoteTable.get(remoteDbName).get(localTableName) == null) { - loadTableNamesIfNeeded(localDbName); - } - - return localTableToRemoteTable.get(remoteDbName).get(localTableName); + Map tableMap = localTableToRemoteTable.computeIfAbsent(remoteDbName, + k -> new ConcurrentHashMap<>()); + return getRequiredMapping(tableMap, localTableName, "table", () -> loadTableNamesIfNeeded(localDbName), + localTableName); } public Map getRemoteColumnNames(String localDbName, String localTableName) { String remoteDbName = getRemoteDatabaseName(localDbName); String remoteTableName = getRemoteTableName(localDbName, localTableName); - if (localColumnToRemoteColumn.isEmpty() - || !localColumnToRemoteColumn.containsKey(remoteDbName) - || localColumnToRemoteColumn.get(remoteDbName) == null - || localColumnToRemoteColumn.get(remoteDbName).isEmpty() - || !localColumnToRemoteColumn.get(remoteDbName).containsKey(remoteTableName) - || localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName) == null - || localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName).isEmpty()) { + ConcurrentHashMap> tableColumnMap + = localColumnToRemoteColumn.computeIfAbsent(remoteDbName, k -> new ConcurrentHashMap<>()); + Map columnMap = tableColumnMap.computeIfAbsent(remoteTableName, k -> new ConcurrentHashMap<>()); + if (columnMap.isEmpty()) { + LOG.info("Column name mapping missing, loading column names for localDbName: {}, localTableName: {}", + localDbName, localTableName); loadColumnNamesIfNeeded(localDbName, localTableName); + columnMap = tableColumnMap.get(remoteTableName); } - return localColumnToRemoteColumn.get(remoteDbName).get(remoteTableName); + if (columnMap.isEmpty()) { + LOG.warn("No remote column found for localTableName: {}. Please refresh this catalog.", localTableName); + throw new RuntimeException( + "No remote column found for localTableName: " + localTableName + ". Please refresh this catalog."); + } + return columnMap; } + private void loadDatabaseNamesIfNeeded() { if (dbNamesLoaded.compareAndSet(false, true)) { - loadDatabaseNames(); + try { + loadDatabaseNames(); + } catch (Exception e) { + dbNamesLoaded.set(false); // Reset on failure + LOG.warn("Error loading database names", e); + } } } private void loadTableNamesIfNeeded(String localDbName) { AtomicBoolean isLoaded = tableNamesLoadedMap.computeIfAbsent(localDbName, k -> new AtomicBoolean(false)); if (isLoaded.compareAndSet(false, true)) { - loadTableNames(localDbName); + try { + loadTableNames(localDbName); + } catch (Exception e) { + tableNamesLoadedMap.get(localDbName).set(false); // Reset on failure + LOG.warn("Error loading table names for localDbName: {}", localDbName, e); + } } } @@ -232,10 +243,31 @@ public abstract class IdentifierMapping { AtomicBoolean isLoaded = columnNamesLoadedMap.get(localDbName) .computeIfAbsent(localTableName, k -> new AtomicBoolean(false)); if (isLoaded.compareAndSet(false, true)) { - loadColumnNames(localDbName, localTableName); + try { + loadColumnNames(localDbName, localTableName); + } catch (Exception e) { + columnNamesLoadedMap.get(localDbName).get(localTableName).set(false); // Reset on failure + LOG.warn("Error loading column names for localDbName: {}, localTableName: {}", localDbName, + localTableName, e); + } } } + private V getRequiredMapping(Map map, K key, String typeName, Runnable loadIfNeeded, + String entityName) { + if (map.isEmpty() || !map.containsKey(key) || map.get(key) == null) { + LOG.info("{} mapping missing, loading for {}: {}", typeName, typeName, entityName); + loadIfNeeded.run(); + } + V value = map.get(key); + if (value == null) { + LOG.warn("No remote {} found for {}: {}. Please refresh this catalog.", typeName, typeName, entityName); + throw new RuntimeException("No remote " + typeName + " found for " + typeName + ": " + entityName + + ". Please refresh this catalog."); + } + return value; + } + // Load the database name from the data source. // In the corresponding getDatabaseNameList(), setDatabaseNameMapping() must be used to update the mapping. protected abstract void loadDatabaseNames();