[feature](jdbc catalog) support db2 jdbc catalog (#31627)

This commit is contained in:
zy-kkk
2024-03-01 10:21:13 +08:00
committed by yiguolei
parent a912a420a9
commit 07224686ef
25 changed files with 998 additions and 41 deletions

View File

@ -75,6 +75,7 @@ public class JdbcResource extends Resource {
public static final String JDBC_TRINO = "jdbc:trino";
public static final String JDBC_PRESTO = "jdbc:presto";
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
public static final String JDBC_DB2 = "jdbc:db2";
public static final String NEBULA = "NEBULA";
public static final String MYSQL = "MYSQL";
@ -87,6 +88,7 @@ public class JdbcResource extends Resource {
public static final String PRESTO = "PRESTO";
public static final String OCEANBASE = "OCEANBASE";
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
public static final String DB2 = "DB2";
public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
public static final String JDBC_URL = "jdbc_url";
@ -327,6 +329,8 @@ public class JdbcResource extends Resource {
return OCEANBASE;
} else if (url.startsWith(JDBC_NEBULA)) {
return NEBULA;
} else if (url.startsWith(JDBC_DB2)) {
return DB2;
}
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
}
@ -416,7 +420,7 @@ public class JdbcResource extends Resource {
}
private static String getDelimiter(String jdbcUrl, String dbType) {
if (dbType.equals(SQLSERVER)) {
if (dbType.equals(SQLSERVER) || dbType.equals(DB2)) {
return ";";
} else if (jdbcUrl.contains("?")) {
return "&";

View File

@ -103,6 +103,7 @@ public class JdbcTable extends Table {
tempMap.put("presto", TOdbcTableType.PRESTO);
tempMap.put("oceanbase", TOdbcTableType.OCEANBASE);
tempMap.put("oceanbase_oracle", TOdbcTableType.OCEANBASE_ORACLE);
tempMap.put("db2", TOdbcTableType.DB2);
TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
}
@ -492,6 +493,7 @@ public class JdbcTable extends Table {
case SAP_HANA:
return formatName(name, "\"", "\"", false, false);
case ORACLE:
case DB2:
return formatName(name, "\"", "\"", true, false);
default:
return name;
@ -512,6 +514,7 @@ public class JdbcTable extends Table {
case OCEANBASE_ORACLE:
case ORACLE:
case SAP_HANA:
case DB2:
return formatNameWithRemoteName(remoteName, "\"", "\"");
default:
return remoteName;

View File

@ -84,6 +84,8 @@ public abstract class JdbcClient {
case JdbcResource.TRINO:
case JdbcResource.PRESTO:
return new JdbcTrinoClient(jdbcClientConfig);
case JdbcResource.DB2:
return new JdbcDB2Client(jdbcClientConfig);
default:
throw new IllegalArgumentException("Unsupported DB type: " + dbType);
}

View File

@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.jdbc.client;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
public class JdbcDB2Client extends JdbcClient {
protected JdbcDB2Client(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
protected String getDatabaseQuery() {
return "SELECT schemaname FROM syscat.schemata WHERE DEFINER = CURRENT USER;";
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String db2Type = fieldSchema.getDataTypeName();
switch (db2Type) {
case "SMALLINT":
return Type.SMALLINT;
case "INTEGER":
return Type.INT;
case "BIGINT":
return Type.BIGINT;
case "DECFLOAT":
case "DECIMAL": {
int precision = fieldSchema.getColumnSize();
int scale = fieldSchema.getDecimalDigits();
return createDecimalOrStringType(precision, scale);
}
case "DOUBLE":
return Type.DOUBLE;
case "REAL":
return Type.FLOAT;
case "CHAR":
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.columnSize);
return charType;
case "VARCHAR":
case "LONG VARCHAR":
ScalarType varcharType = ScalarType.createType(PrimitiveType.VARCHAR);
varcharType.setLength(fieldSchema.columnSize);
return varcharType;
case "DATE":
return ScalarType.createDateV2Type();
case "TIMESTAMP": {
// postgres can support microsecond
int scale = fieldSchema.getDecimalDigits();
if (scale > 6) {
scale = 6;
}
return ScalarType.createDatetimeV2Type(scale);
}
case "TIME":
case "CLOB":
case "VARGRAPHIC":
case "LONG VARGRAPHIC":
return ScalarType.createStringType();
default:
return Type.UNSUPPORTED;
}
}
}