[feature](multi-catalog) Rename multi-catalog config 'specified_database_list' to 'include_database_list', and introduce new multi-catalog config 'exclude_database_list' (#18834)
In my scene, We need to specify databases that are excluded to synchronize to doris, like some databases store temporary table. Since #17803 introduce `specified_database_list` to specify 'include databases', this pr introduce new config `exclude_database_list` to specify 'exclude databases', and rename `specified_database_list` to `include_database_list` for naming symmetry. BTW, when `include_database_list` and `exclude_database_list` specify overlapping databases, `exclude_database_list` would take effect with higher privilege over `include_database_list`.
This commit is contained in:
@ -101,14 +101,16 @@ public class JdbcResource extends Resource {
|
||||
TYPE,
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
SPECIFIED_DATABASE_LIST,
|
||||
OCEANBASE_MODE
|
||||
OCEANBASE_MODE,
|
||||
INCLUDE_DATABASE_LIST,
|
||||
EXCLUDE_DATABASE_LIST
|
||||
).build();
|
||||
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
|
||||
ONLY_SPECIFIED_DATABASE,
|
||||
LOWER_CASE_TABLE_NAMES,
|
||||
SPECIFIED_DATABASE_LIST,
|
||||
OCEANBASE_MODE
|
||||
OCEANBASE_MODE,
|
||||
INCLUDE_DATABASE_LIST,
|
||||
EXCLUDE_DATABASE_LIST
|
||||
).build();
|
||||
|
||||
// The default value of optional properties
|
||||
@ -118,8 +120,9 @@ public class JdbcResource extends Resource {
|
||||
static {
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(ONLY_SPECIFIED_DATABASE, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(LOWER_CASE_TABLE_NAMES, "false");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(SPECIFIED_DATABASE_LIST, "");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(OCEANBASE_MODE, "");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(INCLUDE_DATABASE_LIST, "");
|
||||
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(EXCLUDE_DATABASE_LIST, "");
|
||||
}
|
||||
|
||||
// timeout for both connection and read. 10 seconds is long enough.
|
||||
|
||||
@ -46,7 +46,8 @@ import java.util.stream.Collectors;
|
||||
public abstract class Resource implements Writable, GsonPostProcessable {
|
||||
private static final Logger LOG = LogManager.getLogger(OdbcCatalogResource.class);
|
||||
public static final String REFERENCE_SPLIT = "@";
|
||||
public static final String SPECIFIED_DATABASE_LIST = "specified_database_list";
|
||||
public static final String INCLUDE_DATABASE_LIST = "include_database_list";
|
||||
public static final String EXCLUDE_DATABASE_LIST = "exclude_database_list";
|
||||
|
||||
public enum ResourceType {
|
||||
UNKNOWN,
|
||||
|
||||
@ -449,8 +449,16 @@ public abstract class ExternalCatalog implements CatalogIf<ExternalDatabase>, Wr
|
||||
throw new NotImplementedException("createDatabase not implemented");
|
||||
}
|
||||
|
||||
public Map getSpecifiedDatabaseMap() {
|
||||
String specifiedDatabaseList = catalogProperty.getOrDefault(Resource.SPECIFIED_DATABASE_LIST, "");
|
||||
public Map getIncludeDatabaseMap() {
|
||||
return getSpecifiedDatabaseMap(Resource.INCLUDE_DATABASE_LIST);
|
||||
}
|
||||
|
||||
public Map getExcludeDatabaseMap() {
|
||||
return getSpecifiedDatabaseMap(Resource.EXCLUDE_DATABASE_LIST);
|
||||
}
|
||||
|
||||
public Map getSpecifiedDatabaseMap(String catalogPropertyKey) {
|
||||
String specifiedDatabaseList = catalogProperty.getOrDefault(catalogPropertyKey, "");
|
||||
Map<String, Boolean> specifiedDatabaseMap = Maps.newHashMap();
|
||||
specifiedDatabaseList = specifiedDatabaseList.trim();
|
||||
if (specifiedDatabaseList.isEmpty()) {
|
||||
|
||||
@ -130,10 +130,15 @@ public class HMSExternalCatalog extends ExternalCatalog {
|
||||
initCatalogLog.setCatalogId(id);
|
||||
initCatalogLog.setType(InitCatalogLog.Type.HMS);
|
||||
List<String> allDatabases = client.getAllDatabases();
|
||||
Map<String, Boolean> specifiedDatabaseMap = getSpecifiedDatabaseMap();
|
||||
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
|
||||
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
|
||||
// Update the db name to id map.
|
||||
for (String dbName : allDatabases) {
|
||||
if (!specifiedDatabaseMap.isEmpty() && specifiedDatabaseMap.get(dbName) == null) {
|
||||
// Exclude database map take effect with higher priority over include database map
|
||||
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
|
||||
continue;
|
||||
}
|
||||
if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
|
||||
continue;
|
||||
}
|
||||
long dbId;
|
||||
|
||||
@ -129,18 +129,15 @@ public class JdbcExternalCatalog extends ExternalCatalog {
|
||||
return catalogProperty.getOrDefault(JdbcResource.LOWER_CASE_TABLE_NAMES, "false");
|
||||
}
|
||||
|
||||
public String getSpecifiedDatabaseList() {
|
||||
return catalogProperty.getOrDefault(JdbcResource.SPECIFIED_DATABASE_LIST, "");
|
||||
}
|
||||
|
||||
public String getOceanBaseMode() {
|
||||
return catalogProperty.getOrDefault(JdbcResource.OCEANBASE_MODE, "");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void initLocalObjectsImpl() {
|
||||
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(), getDriverClass(),
|
||||
getOnlySpecifiedDatabase(), getLowerCaseTableNames(), getSpecifiedDatabaseMap(), getOceanBaseMode());
|
||||
jdbcClient = new JdbcClient(getJdbcUser(), getJdbcPasswd(), getJdbcUrl(), getDriverUrl(),
|
||||
getDriverClass(), getOnlySpecifiedDatabase(), getLowerCaseTableNames(),
|
||||
getOceanBaseMode(), getIncludeDatabaseMap(), getExcludeDatabaseMap());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@ -67,9 +67,14 @@ public abstract class IcebergExternalCatalog extends ExternalCatalog {
|
||||
initCatalogLog.setCatalogId(id);
|
||||
initCatalogLog.setType(InitCatalogLog.Type.ICEBERG);
|
||||
List<String> allDatabaseNames = listDatabaseNames();
|
||||
Map<String, Boolean> specifiedDatabaseMap = getSpecifiedDatabaseMap();
|
||||
Map<String, Boolean> includeDatabaseMap = getIncludeDatabaseMap();
|
||||
Map<String, Boolean> excludeDatabaseMap = getExcludeDatabaseMap();
|
||||
for (String dbName : allDatabaseNames) {
|
||||
if (!specifiedDatabaseMap.isEmpty() && specifiedDatabaseMap.get(dbName) == null) {
|
||||
// Exclude database map take effect with higher priority over include database map
|
||||
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(dbName)) {
|
||||
continue;
|
||||
}
|
||||
if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(dbName)) {
|
||||
continue;
|
||||
}
|
||||
long dbId;
|
||||
|
||||
@ -61,7 +61,8 @@ public class JdbcClient {
|
||||
|
||||
private boolean isLowerCaseTableNames = false;
|
||||
|
||||
private Map<String, Boolean> specifiedDatabaseMap = Maps.newHashMap();
|
||||
private Map<String, Boolean> includeDatabaseMap = Maps.newHashMap();
|
||||
private Map<String, Boolean> excludeDatabaseMap = Maps.newHashMap();
|
||||
|
||||
// only used when isLowerCaseTableNames = true.
|
||||
private Map<String, String> lowerTableToRealTable = Maps.newHashMap();
|
||||
@ -69,13 +70,16 @@ public class JdbcClient {
|
||||
private String oceanbaseMode = "";
|
||||
|
||||
public JdbcClient(String user, String password, String jdbcUrl, String driverUrl, String driverClass,
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames, Map specifiedDatabaseMap,
|
||||
String oceanbaseMode) {
|
||||
String onlySpecifiedDatabase, String isLowerCaseTableNames, String oceanbaseMode, Map includeDatabaseMap,
|
||||
Map excludeDatabaseMap) {
|
||||
this.jdbcUser = user;
|
||||
this.isOnlySpecifiedDatabase = Boolean.valueOf(onlySpecifiedDatabase).booleanValue();
|
||||
this.isLowerCaseTableNames = Boolean.valueOf(isLowerCaseTableNames).booleanValue();
|
||||
if (specifiedDatabaseMap != null) {
|
||||
this.specifiedDatabaseMap = specifiedDatabaseMap;
|
||||
if (includeDatabaseMap != null) {
|
||||
this.includeDatabaseMap = includeDatabaseMap;
|
||||
}
|
||||
if (excludeDatabaseMap != null) {
|
||||
this.excludeDatabaseMap = excludeDatabaseMap;
|
||||
}
|
||||
this.oceanbaseMode = oceanbaseMode;
|
||||
try {
|
||||
@ -180,7 +184,7 @@ public class JdbcClient {
|
||||
Connection conn = getConnection();
|
||||
Statement stmt = null;
|
||||
ResultSet rs = null;
|
||||
if (isOnlySpecifiedDatabase && specifiedDatabaseMap.isEmpty()) {
|
||||
if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) {
|
||||
return getSpecifiedDatabase(conn);
|
||||
}
|
||||
List<String> databaseNames = Lists.newArrayList();
|
||||
@ -216,11 +220,16 @@ public class JdbcClient {
|
||||
while (rs.next()) {
|
||||
tempDatabaseNames.add(rs.getString(1));
|
||||
}
|
||||
if (isOnlySpecifiedDatabase && !specifiedDatabaseMap.isEmpty()) {
|
||||
if (isOnlySpecifiedDatabase) {
|
||||
for (String db : tempDatabaseNames) {
|
||||
if (specifiedDatabaseMap.get(db) != null) {
|
||||
databaseNames.add(db);
|
||||
// Exclude database map take effect with higher priority over include database map
|
||||
if (!excludeDatabaseMap.isEmpty() && excludeDatabaseMap.containsKey(db)) {
|
||||
continue;
|
||||
}
|
||||
if (!includeDatabaseMap.isEmpty() && includeDatabaseMap.containsKey(db)) {
|
||||
continue;
|
||||
}
|
||||
databaseNames.add(db);
|
||||
}
|
||||
} else {
|
||||
databaseNames = tempDatabaseNames;
|
||||
|
||||
Reference in New Issue
Block a user