[feature](multi-catalog) support postgresql jdbc catalog (#15570)

support postgresql jdbc catalog
This commit is contained in:
Tiewei Fang
2023-01-06 11:00:59 +08:00
committed by GitHub
parent b57500d0c3
commit df2da89b89
17 changed files with 2611 additions and 44 deletions

View File

@ -67,7 +67,7 @@ public class JdbcResource extends Resource {
public static final String DRIVER_CLASS = "driver_class";
public static final String DRIVER_URL = "driver_url";
public static final String TYPE = "type";
private static final String CHECK_SUM = "checksum";
public static final String CHECK_SUM = "checksum";
// timeout for both connection and read. 10 seconds is long enough.
private static final int HTTP_TIMEOUT_MS = 10000;

View File

@ -50,7 +50,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
private String jdbcUrl;
private String driverUrl;
private String driverClass;
private String checkSum;
private String checkSum = "";
public JdbcExternalCatalog(
long catalogId, String name, String resource, Map<String, String> props) throws DdlException {
@ -84,6 +84,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
properties.put(JdbcResource.JDBC_URL, jdbcUrl);
driverUrl = properties.getOrDefault(JdbcResource.DRIVER_URL, "");
driverClass = properties.getOrDefault(JdbcResource.DRIVER_CLASS, "");
checkSum = properties.getOrDefault(JdbcResource.CHECK_SUM, "");
return properties;
}
@ -119,7 +120,9 @@ public class JdbcExternalCatalog extends ExternalCatalog {
protected void initLocalObjectsImpl() {
jdbcClient = new JdbcClient(jdbcUser, jdbcPasswd, jdbcUrl, driverUrl, driverClass);
databaseTypeName = jdbcClient.getDbType();
checkSum = jdbcClient.getCheckSum();
if (checkSum.isEmpty()) {
checkSum = jdbcClient.getCheckSum();
}
}
@Override

View File

@ -52,9 +52,9 @@ import java.util.List;
public class JdbcClient {
private static final Logger LOG = LogManager.getLogger(JdbcClient.class);
private static final String MYSQL = "MYSQL";
private static final String POSTGRESQL = "POSTGRESQL";
// private static final String ORACLE = "ORACLE";
// private static final String SQLSERVER = "SQLSERVER";
// private static final String POSTGRESQL = "POSTGRESQL";
private static final int HTTP_TIMEOUT_MS = 10000;
private String dbType;
@ -108,15 +108,14 @@ public class JdbcClient {
public String parseDbType(String url) {
if (url.startsWith("jdbc:mysql") || url.startsWith("jdbc:mariadb")) {
return MYSQL;
} else if (url.startsWith("jdbc:postgresql")) {
return POSTGRESQL;
}
// else if (url.startsWith("jdbc:oracle")) {
// return ORACLE;
// }
// else if (url.startsWith("jdbc:sqlserver")) {
// return SQLSERVER;
// } else if (url.startsWith("jdbc:postgresql")) {
// return POSTGRESQL;
// }
throw new JdbcClientException("Unsupported jdbc database type, please check jdbcUrl: " + jdbcUrl);
}
@ -181,7 +180,18 @@ public class JdbcClient {
List<String> databaseNames = Lists.newArrayList();
try {
stmt = conn.createStatement();
rs = stmt.executeQuery("SHOW DATABASES");
switch (dbType) {
case MYSQL:
rs = stmt.executeQuery("SHOW DATABASES");
break;
case POSTGRESQL:
rs = stmt.executeQuery("SELECT schema_name FROM information_schema.schemata "
+ "where schema_owner='" + jdbcUser + "';");
break;
default:
throw new JdbcClientException("Not supported jdbc type");
}
while (rs.next()) {
databaseNames.add(rs.getString(1));
}
@ -207,6 +217,9 @@ public class JdbcClient {
case MYSQL:
rs = databaseMetaData.getTables(dbName, null, null, types);
break;
case POSTGRESQL:
rs = databaseMetaData.getTables(null, dbName, null, types);
break;
default:
throw new JdbcClientException("Unknown database type");
}
@ -292,7 +305,16 @@ public class JdbcClient {
// Can contain single-character wildcards ("_"), or multi-character wildcards ("%")
// columnNamePattern - column name, `null` means get all columns
// Can contain single-character wildcards ("_"), or multi-character wildcards ("%")
rs = databaseMetaData.getColumns(dbName, null, tableName, null);
switch (dbType) {
case MYSQL:
rs = databaseMetaData.getColumns(dbName, null, tableName, null);
break;
case POSTGRESQL:
rs = databaseMetaData.getColumns(null, dbName, tableName, null);
break;
default:
throw new JdbcClientException("Unknown database type");
}
while (rs.next()) {
JdbcFieldSchema field = new JdbcFieldSchema();
field.setColumnName(rs.getString("COLUMN_NAME"));
@ -318,6 +340,8 @@ public class JdbcClient {
switch (dbType) {
case MYSQL:
return mysqlTypeToDoris(fieldSchema);
case POSTGRESQL:
return postgresqlTypeToDoris(fieldSchema);
default:
throw new JdbcClientException("Unknown database type");
}
@ -373,7 +397,6 @@ public class JdbcClient {
case "DATE":
return ScalarType.getDefaultDateType(Type.DATE);
case "TIMESTAMP":
return ScalarType.getDefaultDateType(Type.DATETIME);
case "DATETIME":
return ScalarType.getDefaultDateType(Type.DATETIME);
case "FLOAT":
@ -418,7 +441,72 @@ public class JdbcClient {
return ScalarType.createStringType();
default:
throw new JdbcClientException("Can not convert mysql data type to doris data type for type ["
+ mysqlType + "]");
+ mysqlType + "]");
}
}
public Type postgresqlTypeToDoris(JdbcFieldSchema fieldSchema) {
String pgType = fieldSchema.getDataTypeName();
switch (pgType) {
case "int2":
case "smallserial":
return Type.SMALLINT;
case "int4":
case "serial":
return Type.INT;
case "int8":
case "bigserial":
return Type.BIGINT;
case "numeric": {
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
if (precision <= ScalarType.MAX_DECIMAL128_PRECISION) {
if (!Config.enable_decimal_conversion && precision > ScalarType.MAX_DECIMALV2_PRECISION) {
return ScalarType.createStringType();
}
return ScalarType.createDecimalType(precision, scale);
} else {
return ScalarType.createStringType();
}
}
case "float4":
return Type.FLOAT;
case "float8":
return Type.DOUBLE;
case "bpchar":
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
case "timestamp":
case "timestamptz":
return ScalarType.getDefaultDateType(Type.DATETIME);
case "date":
return ScalarType.getDefaultDateType(Type.DATE);
case "bool":
return Type.BOOLEAN;
case "bit":
case "point":
case "line":
case "lseg":
case "box":
case "path":
case "polygon":
case "circle":
case "varchar":
case "text":
case "time":
case "timetz":
case "interval":
case "cidr":
case "inet":
case "macaddr":
case "varbit":
case "jsonb":
case "uuid":
return ScalarType.createStringType();
default:
throw new JdbcClientException("Can not convert postgresql data type to doris data type for type ["
+ pgType + "]");
}
}

View File

@ -2313,6 +2313,9 @@ public class ShowExecutor {
public void handleShowCatalogs() throws AnalysisException {
ShowCatalogStmt showStmt = (ShowCatalogStmt) stmt;
if (ctx.getCurrentCatalog() == null) {
throw new AnalysisException("Current catalog is not exist, please switch catalog.");
}
resultSet = Env.getCurrentEnv().getCatalogMgr().showCatalogs(showStmt, ctx.getCurrentCatalog().getName());
}