[feature](jdbc catalog) support gbase jdbc catalog #41027 #41587 (#42123)

cherry pick from #41027 #41587

---------

Co-authored-by: zy-kkk <zhongyk10@gmail.com>
This commit is contained in:
Rayner Chen
2024-10-21 16:52:23 +08:00
committed by GitHub
parent a32ad0b1f7
commit bbd4970ed8
10 changed files with 436 additions and 2 deletions

View File

@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.jdbc;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnType.Type;
import org.apache.doris.common.jni.vec.ColumnValueConverter;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Objects;
public class GbaseJdbcExecutor extends BaseJdbcExecutor {
public GbaseJdbcExecutor(byte[] thriftParams) throws Exception {
super(thriftParams);
}
@Override
protected Object getColumnValue(int columnIndex, ColumnType type, String[] replaceStringList) throws SQLException {
switch (type.getType()) {
case TINYINT:
byte tinyIntVal = resultSet.getByte(columnIndex + 1);
return resultSet.wasNull() ? null : tinyIntVal;
case SMALLINT:
short smallIntVal = resultSet.getShort(columnIndex + 1);
return resultSet.wasNull() ? null : smallIntVal;
case INT:
int intVal = resultSet.getInt(columnIndex + 1);
return resultSet.wasNull() ? null : intVal;
case BIGINT:
long bigIntVal = resultSet.getLong(columnIndex + 1);
return resultSet.wasNull() ? null : bigIntVal;
case FLOAT:
float floatVal = resultSet.getFloat(columnIndex + 1);
return resultSet.wasNull() ? null : floatVal;
case DOUBLE:
double doubleVal = resultSet.getDouble(columnIndex + 1);
return resultSet.wasNull() ? null : doubleVal;
case DECIMALV2:
case DECIMAL32:
case DECIMAL64:
case DECIMAL128:
BigDecimal decimalVal = resultSet.getBigDecimal(columnIndex + 1);
return resultSet.wasNull() ? null : decimalVal;
case DATE:
case DATEV2:
Date dateVal = resultSet.getDate(columnIndex + 1);
return resultSet.wasNull() ? null : dateVal.toLocalDate();
case DATETIME:
case DATETIMEV2:
Timestamp timestampVal = resultSet.getTimestamp(columnIndex + 1);
return resultSet.wasNull() ? null : timestampVal.toLocalDateTime();
case CHAR:
case VARCHAR:
case STRING:
String stringVal = (String) resultSet.getObject(columnIndex + 1);
return resultSet.wasNull() ? null : stringVal;
default:
throw new IllegalArgumentException("Unsupported column type: " + type.getType());
}
}
@Override
protected ColumnValueConverter getOutputConverter(ColumnType columnType, String replaceString) {
if (Objects.requireNonNull(columnType.getType()) == Type.CHAR) {
return createConverter(
input -> trimSpaces(input.toString()), String.class);
}
return null;
}
}

View File

@ -41,6 +41,8 @@ public class JdbcExecutorFactory {
case TRINO:
case PRESTO:
return "org/apache/doris/jdbc/TrinoJdbcExecutor";
case GBASE:
return "org/apache/doris/jdbc/GbaseJdbcExecutor";
default:
throw new IllegalArgumentException("Unsupported jdbc type: " + type);
}

View File

@ -76,6 +76,7 @@ public class JdbcResource extends Resource {
public static final String JDBC_PRESTO = "jdbc:presto";
public static final String JDBC_OCEANBASE = "jdbc:oceanbase";
public static final String JDBC_DB2 = "jdbc:db2";
public static final String JDBC_GBASE = "jdbc:gbase";
public static final String MYSQL = "MYSQL";
public static final String POSTGRESQL = "POSTGRESQL";
@ -88,6 +89,7 @@ public class JdbcResource extends Resource {
public static final String OCEANBASE = "OCEANBASE";
public static final String OCEANBASE_ORACLE = "OCEANBASE_ORACLE";
public static final String DB2 = "DB2";
public static final String GBASE = "GBASE";
public static final String JDBC_PROPERTIES_PREFIX = "jdbc.";
public static final String JDBC_URL = "jdbc_url";
@ -323,6 +325,8 @@ public class JdbcResource extends Resource {
return OCEANBASE;
} else if (url.startsWith(JDBC_DB2)) {
return DB2;
} else if (url.startsWith(JDBC_GBASE)) {
return GBASE;
}
throw new DdlException("Unsupported jdbc database type, please check jdbcUrl: " + url);
}

View File

@ -103,6 +103,7 @@ public class JdbcTable extends Table {
tempMap.put("oceanbase", TOdbcTableType.OCEANBASE);
tempMap.put("oceanbase_oracle", TOdbcTableType.OCEANBASE_ORACLE);
tempMap.put("db2", TOdbcTableType.DB2);
tempMap.put("gbase", TOdbcTableType.GBASE);
TABLE_TYPE_MAP = Collections.unmodifiableMap(tempMap);
}
@ -481,6 +482,7 @@ public class JdbcTable extends Table {
switch (tableType) {
case MYSQL:
case OCEANBASE:
case GBASE:
return formatName(name, "`", "`", false, false);
case SQLSERVER:
return formatName(name, "[", "]", false, false);
@ -503,6 +505,7 @@ public class JdbcTable extends Table {
switch (tableType) {
case MYSQL:
case OCEANBASE:
case GBASE:
return formatNameWithRemoteName(remoteName, "`", "`");
case SQLSERVER:
return formatNameWithRemoteName(remoteName, "[", "]");

View File

@ -91,6 +91,8 @@ public abstract class JdbcClient {
return new JdbcTrinoClient(jdbcClientConfig);
case JdbcResource.DB2:
return new JdbcDB2Client(jdbcClientConfig);
case JdbcResource.GBASE:
return new JdbcGbaseClient(jdbcClientConfig);
default:
throw new IllegalArgumentException("Unsupported DB type: " + dbType);
}

View File

@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.jdbc.client;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.jdbc.util.JdbcFieldSchema;
import com.google.common.collect.Lists;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.List;
import java.util.function.Consumer;
public class JdbcGbaseClient extends JdbcClient {
protected JdbcGbaseClient(JdbcClientConfig jdbcClientConfig) {
super(jdbcClientConfig);
}
@Override
public List<String> getDatabaseNameList() {
Connection conn = getConnection();
ResultSet rs = null;
List<String> remoteDatabaseNames = Lists.newArrayList();
try {
if (isOnlySpecifiedDatabase && includeDatabaseMap.isEmpty() && excludeDatabaseMap.isEmpty()) {
String currentDatabase = conn.getCatalog();
remoteDatabaseNames.add(currentDatabase);
} else {
rs = conn.getMetaData().getCatalogs();
while (rs.next()) {
remoteDatabaseNames.add(rs.getString("TABLE_CAT"));
}
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get database name list from jdbc", e);
} finally {
close(rs, conn);
}
return filterDatabaseNames(remoteDatabaseNames);
}
@Override
protected void processTable(String remoteDbName, String remoteTableName, String[] tableTypes,
Consumer<ResultSet> resultSetConsumer) {
Connection conn = null;
ResultSet rs = null;
try {
conn = super.getConnection();
DatabaseMetaData databaseMetaData = conn.getMetaData();
rs = databaseMetaData.getTables(remoteDbName, null, remoteTableName, tableTypes);
resultSetConsumer.accept(rs);
} catch (SQLException e) {
throw new JdbcClientException("Failed to process table", e);
} finally {
close(rs, conn);
}
}
@Override
protected ResultSet getRemoteColumns(DatabaseMetaData databaseMetaData, String catalogName, String remoteDbName,
String remoteTableName) throws SQLException {
return databaseMetaData.getColumns(remoteDbName, null, remoteTableName, null);
}
@Override
public List<JdbcFieldSchema> getJdbcColumnsInfo(String localDbName, String localTableName) {
Connection conn = getConnection();
ResultSet rs = null;
List<JdbcFieldSchema> tableSchema = Lists.newArrayList();
String remoteDbName = getRemoteDatabaseName(localDbName);
String remoteTableName = getRemoteTableName(localDbName, localTableName);
try {
DatabaseMetaData databaseMetaData = conn.getMetaData();
String catalogName = getCatalogName(conn);
rs = getRemoteColumns(databaseMetaData, catalogName, remoteDbName, remoteTableName);
while (rs.next()) {
JdbcFieldSchema field = new JdbcFieldSchema(rs);
tableSchema.add(field);
}
} catch (SQLException e) {
throw new JdbcClientException("failed to get jdbc columns info for remote table `%s.%s`: %s",
remoteDbName, remoteTableName, Util.getRootCauseMessage(e));
} finally {
close(rs, conn);
}
return tableSchema;
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
switch (fieldSchema.getDataType()) {
case Types.TINYINT:
return Type.TINYINT;
case Types.SMALLINT:
return Type.SMALLINT;
case Types.INTEGER:
return Type.INT;
case Types.BIGINT:
return Type.BIGINT;
case Types.FLOAT:
case Types.REAL:
return Type.FLOAT;
case Types.DOUBLE:
return Type.DOUBLE;
case Types.NUMERIC:
case Types.DECIMAL: {
int precision = fieldSchema.getColumnSize()
.orElseThrow(() -> new IllegalArgumentException("Precision not present"));
int scale = fieldSchema.getDecimalDigits()
.orElseThrow(() -> new JdbcClientException("Scale not present"));
return createDecimalOrStringType(precision, scale);
}
case Types.DATE:
return Type.DATEV2;
case Types.TIMESTAMP: {
int scale = fieldSchema.getDecimalDigits().orElse(0);
if (scale > 6) {
scale = 6;
}
return ScalarType.createDatetimeV2Type(scale);
}
case Types.CHAR:
ScalarType charType = ScalarType.createType(PrimitiveType.CHAR);
charType.setLength(fieldSchema.getColumnSize()
.orElseThrow(() -> new IllegalArgumentException("Length not present")));
return charType;
case Types.TIME:
case Types.VARCHAR:
case Types.LONGVARCHAR:
return ScalarType.createStringType();
default:
return Type.UNSUPPORTED;
}
}
}

View File

@ -202,7 +202,8 @@ public class JdbcScanNode extends ExternalScanNode {
|| jdbcType == TOdbcTableType.SAP_HANA
|| jdbcType == TOdbcTableType.TRINO
|| jdbcType == TOdbcTableType.PRESTO
|| jdbcType == TOdbcTableType.OCEANBASE)) {
|| jdbcType == TOdbcTableType.OCEANBASE
|| jdbcType == TOdbcTableType.GBASE)) {
sql.append(" LIMIT ").append(limit);
}

View File

@ -401,7 +401,8 @@ enum TOdbcTableType {
OCEANBASE,
OCEANBASE_ORACLE,
NEBULA, // Deprecated
DB2
DB2,
GBASE
}
struct TJdbcExecutorCtorParams {

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,143 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
suite("test_gbase_jdbc_catalog", "p0,external,gbase,external_docker,external_docker_gbase") {
// Because there is no Gbase Dcoker test environment, the test case will not be executed for the time being.
// Gbase ddl
// CREATE TABLE gbase_test (
// tinyint_col TINYINT,
// smallint_col SMALLINT,
// int_col INT,
// bigint_col BIGINT,
// float_col FLOAT,
// double_col DOUBLE,
// decimal_col DECIMAL(10, 2),
// numeric_col NUMERIC(10, 2),
// char_col CHAR(255),
// varchar_col VARCHAR(10922),
// text_col TEXT,
// date_col DATE,
// datetime_col DATETIME,
// time_col TIME,
// timestamp_col TIMESTAMP
// );
// INSERT INTO gbase_test VALUES (
// 1, -- tinyint_col
// 18, -- smallint_col
// 100, -- int_col
// 500000, -- bigint_col
// 1.75, -- float_col
// 70.5, -- double_col
// 12345.67, -- decimal_col
// 100.00, -- numeric_col
// 'John Doe', -- char_col
// 'A description',-- varchar_col
// 'Detailed text data', -- text_col
// '2023-09-19', -- date_col
// '2023-09-19 12:34:56', -- datetime_col
// '12:00:00', -- time_col
// '2023-09-19 14:45:00' -- timestamp_col
// );
//
//
// INSERT INTO gbase_test VALUES (
// NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL
// );
// INSERT INTO gbase_test VALUES (
// -127, -- tinyint_col (min value)
// -32767, -- smallint_col (min value)
// -2147483647, -- int_col (min value)
// -92233720368, -- bigint_col (min value)
// -3.40E+38, -- float_col (min value)
// -1.7976931348623157E+308, -- double_col (min value)
// -(1E+9 - 1)/(1E+2), -- decimal_col (min value)
// -(1E+9 - 1)/(1E+2), -- numeric_col (min value)
// '', -- char_col (empty string,min value)
// '', -- varchar_col (empty string,min value)
// '', -- text_col (empty string,min value)
// '0001-01-01', -- date_col (min value)
// '0001-01-01 00:00:00.000000',-- datetime_col (min value)
// '-838:59:59', -- time_col (min value)
// '1970-01-01 08:00:01' -- timestamp_col (min value)
// );
// INSERT INTO gbase_test VALUES (
// 127, -- tinyint_col (max value)
// 32767, -- smallint_col (max value)
// 2147483647, -- int_col (max value)
// 92233720368547758, -- bigint_col (max value)
// 3.40E+38, -- float_col (max value)
// 1.7976931348623157E+308, -- double_col (max value)
// (1E+9 - 1)/(1E+2), -- decimal_col (max value)
// (1E+9 - 1)/(1E+2), -- numeric_col (max value)
// REPEAT('Z', 255), -- char_col (max value 255 characters)
// REPEAT('A', 10922), -- varchar_col (max value 10922 characters)
// REPEAT('A', 21845), -- text_col (max value 21845 characters)
// '9999-12-31', -- date_col (max value)
// '9999-12-31 23:59:59.999999',-- datetime_col (max value)
// '838:59:59', -- time_col (max value)
// '2038-01-01 00:59:59' -- timestamp_col (max value)
// );
// CREATE TABLE "pt1" (
// a datetime DEFAULT NULL
// ) PARTITION BY RANGE(dayofmonth(a))
// (PARTITION p0 VALUES LESS THAN (10));
//
// CREATE TABLE "pt2" (
// a datetime DEFAULT NULL
// )
// PARTITION BY LIST (time_to_sec(a))
// (PARTITION p0 VALUES IN (3,5,6,9,17));
//
// CREATE TABLE "pt3" (
// "a" int(11) DEFAULT NULL
// )
// PARTITION BY HASH (abs(a));
//
// CREATE TABLE "pt4" (
// "a" varchar(100) DEFAULT NULL
// ) ENGINE=EXPRESS DEFAULT CHARSET=utf8 TABLESPACE='sys_tablespace'
// PARTITION BY KEY (a)
// PARTITIONS 1;
// Doris Catalog
// sql """
// CREATE CATALOG `gbase` PROPERTIES (
// "user" = "root",
// "type" = "jdbc",
// "password" = "",
// "jdbc_url" = "jdbc:gbase://127.0.0.1:5258/doris_test",
// "driver_url" = "gbase-connector-java-9.5.0.7-build1-bin.jar",
// "driver_class" = "com.gbase.jdbc.Driver"
// ); """
//sql """switch gbase;"""
//sql """use doris_test;"""
//order_qt_sample_table_desc """ desc gbase_test; """
//order_qt_sample_table_select """ select * from gbase_test order by 1; """
//order_qt_show_tables """ show tables; """
// explain {
// sql("select tinyint_col from gbase_test limit 2;")
// contains "QUERY: SELECT `tinyint_col` FROM `doris_test`.`gbase_test` LIMIT 2"
// }
}