diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 7a37d61326..91a3d1ea51 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -786,6 +787,65 @@ void PInternalServiceImpl::tablet_fetch_data(google::protobuf::RpcController* co } } +void PInternalServiceImpl::test_jdbc_connection(google::protobuf::RpcController* controller, + const PJdbcTestConnectionRequest* request, + PJdbcTestConnectionResult* result, + google::protobuf::Closure* done) { + bool ret = _heavy_work_pool.try_offer([request, result, done]() { + VLOG_RPC << "test jdbc connection"; + brpc::ClosureGuard closure_guard(done); + TTableDescriptor table_desc; + vectorized::JdbcConnectorParam jdbc_param; + Status st = Status::OK(); + { + const uint8_t* buf = (const uint8_t*)request->jdbc_table().data(); + uint32_t len = request->jdbc_table().size(); + st = deserialize_thrift_msg(buf, &len, false, &table_desc); + if (!st.ok()) { + LOG(WARNING) << "test jdbc connection failed, errmsg=" << st; + st.to_protobuf(result->mutable_status()); + return; + } + } + TJdbcTable jdbc_table = (table_desc.jdbcTable); + jdbc_param.catalog_id = jdbc_table.catalog_id; + jdbc_param.driver_class = jdbc_table.jdbc_driver_class; + jdbc_param.driver_path = jdbc_table.jdbc_driver_url; + jdbc_param.driver_checksum = jdbc_table.jdbc_driver_checksum; + jdbc_param.jdbc_url = jdbc_table.jdbc_url; + jdbc_param.user = jdbc_table.jdbc_user; + jdbc_param.passwd = jdbc_table.jdbc_password; + jdbc_param.query_string = request->query_str(); + jdbc_param.table_type = static_cast(request->jdbc_table_type()); + jdbc_param.use_transaction = false; + jdbc_param.connection_pool_min_size = jdbc_table.connection_pool_min_size; + jdbc_param.connection_pool_max_size = jdbc_table.connection_pool_max_size; + jdbc_param.connection_pool_max_life_time = jdbc_table.connection_pool_max_life_time; + jdbc_param.connection_pool_max_wait_time = jdbc_table.connection_pool_max_wait_time; + jdbc_param.connection_pool_keep_alive = jdbc_table.connection_pool_keep_alive; + + std::unique_ptr jdbc_connector; + jdbc_connector.reset(new (std::nothrow) vectorized::JdbcConnector(jdbc_param)); + + st = jdbc_connector->test_connection(); + st.to_protobuf(result->mutable_status()); + + Status clean_st = jdbc_connector->clean_datasource(); + if (!clean_st.ok()) { + LOG(WARNING) << "Failed to clean JDBC datasource: " << clean_st.msg(); + } + Status close_st = jdbc_connector->close(); + if (!close_st.ok()) { + LOG(WARNING) << "Failed to close JDBC connector: " << close_st.msg(); + } + }); + + if (!ret) { + offer_failed(result, done, _heavy_work_pool); + return; + } +} + void PInternalServiceImpl::get_column_ids_by_tablet_ids(google::protobuf::RpcController* controller, const PFetchColIdsRequest* request, PFetchColIdsResponse* response, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index 261a3d161d..4fffdc3387 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -214,6 +214,11 @@ public: PGetWalQueueSizeResponse* response, google::protobuf::Closure* done) override; + void test_jdbc_connection(google::protobuf::RpcController* controller, + const PJdbcTestConnectionRequest* request, + PJdbcTestConnectionResult* result, + google::protobuf::Closure* done) override; + private: void _exec_plan_fragment_in_pthread(google::protobuf::RpcController* controller, const PExecPlanFragmentRequest* request, diff --git a/be/src/vec/exec/vjdbc_connector.cpp b/be/src/vec/exec/vjdbc_connector.cpp index 318651ea88..6a5aa0fdc3 100644 --- a/be/src/vec/exec/vjdbc_connector.cpp +++ b/be/src/vec/exec/vjdbc_connector.cpp @@ -164,7 +164,11 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { ctor_params.__set_jdbc_password(_conn_param.passwd); ctor_params.__set_jdbc_driver_class(_conn_param.driver_class); ctor_params.__set_driver_path(local_location); - ctor_params.__set_batch_size(read ? state->batch_size() : 0); + if (state == nullptr) { + ctor_params.__set_batch_size(read ? 1 : 0); + } else { + ctor_params.__set_batch_size(read ? state->batch_size() : 0); + } ctor_params.__set_op(read ? TJdbcOperation::READ : TJdbcOperation::WRITE); ctor_params.__set_table_type(_conn_param.table_type); ctor_params.__set_connection_pool_min_size(_conn_param.connection_pool_min_size); @@ -195,6 +199,23 @@ Status JdbcConnector::open(RuntimeState* state, bool read) { return Status::OK(); } +Status JdbcConnector::test_connection() { + RETURN_IF_ERROR(open(nullptr, true)); + + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_test_connection_id); + return JniUtil::GetJniExceptionMsg(env); +} + +Status JdbcConnector::clean_datasource() { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallNonvirtualVoidMethod(_executor_obj, _executor_clazz, _executor_clean_datasource_id); + return JniUtil::GetJniExceptionMsg(env); +} + Status JdbcConnector::query() { if (!_is_open) { return Status::InternalError("Query before open of JdbcConnector."); @@ -380,6 +401,10 @@ Status JdbcConnector::_register_func_id(JNIEnv* env) { JDBC_EXECUTOR_TRANSACTION_SIGNATURE, _executor_abort_trans_id)); RETURN_IF_ERROR(register_id(_executor_clazz, "getResultColumnTypeNames", JDBC_EXECUTOR_GET_TYPES_SIGNATURE, _executor_get_types_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "testConnection", "()V", _executor_test_connection_id)); + RETURN_IF_ERROR( + register_id(_executor_clazz, "cleanDataSource", "()V", _executor_clean_datasource_id)); return Status::OK(); } diff --git a/be/src/vec/exec/vjdbc_connector.h b/be/src/vec/exec/vjdbc_connector.h index ed2afdecdf..e42097b3ab 100644 --- a/be/src/vec/exec/vjdbc_connector.h +++ b/be/src/vec/exec/vjdbc_connector.h @@ -113,6 +113,9 @@ public: Status close(Status s = Status::OK()) override; + Status test_connection(); + Status clean_datasource(); + protected: JdbcConnectorParam _conn_param; @@ -155,6 +158,8 @@ private: jmethodID _executor_begin_trans_id; jmethodID _executor_finish_trans_id; jmethodID _executor_abort_trans_id; + jmethodID _executor_test_connection_id; + jmethodID _executor_clean_datasource_id; std::map _map_column_idx_to_cast_idx_hll; std::vector _input_hll_string_types; diff --git a/docs/en/docs/lakehouse/multi-catalog/jdbc.md b/docs/en/docs/lakehouse/multi-catalog/jdbc.md index 6b3682c61f..c53dbec7ac 100644 --- a/docs/en/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/en/docs/lakehouse/multi-catalog/jdbc.md @@ -56,6 +56,8 @@ PROPERTIES ("key" = "value", ...) | `meta_names_mapping` | No | | When the JDBC external data source has the same name but different case, e.g. DORIS and doris, Doris reports an error when querying the catalog due to ambiguity. In this case, the `meta_names_mapping` parameter needs to be specified to resolve the conflict. | | `include_database_list` | No | | When `only_specified_database = true`,only synchronize the specified databases. Separate with `,`. Database name is case sensitive. | | `exclude_database_list` | No | | When `only_specified_database = true`,do not synchronize the specified databases. Separate with `,`. Database name is case sensitive. | +| `test_connection` | No | "true" | Whether to test the connection when creating the catalog. If set to `true`, the connection will be tested when creating the catalog and will refuse to create the catalog if the connection fails. If set to `false`, the connection will not be tested. | + ### Driver path diff --git a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md index dde24e1eb5..88cafda0d4 100644 --- a/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md +++ b/docs/zh-CN/docs/lakehouse/multi-catalog/jdbc.md @@ -44,7 +44,7 @@ PROPERTIES ("key"="value", ...) ## 参数说明 -| 参数 | 必须 | 默认值 | 说明 | +| 参数 | 必须 | 默认值 | 说明 | |---------------------------|-----|---------|-----------------------------------------------------------------------| | `user` | 是 | | 对应数据库的用户名 | | `password` | 是 | | 对应数据库的密码 | @@ -56,6 +56,7 @@ PROPERTIES ("key"="value", ...) | `only_specified_database` | 否 | "false" | 指定是否只同步指定的 database | | `include_database_list` | 否 | "" | 当only_specified_database=true时,指定同步多个database,以','分隔。db名称是大小写敏感的。 | | `exclude_database_list` | 否 | "" | 当only_specified_database=true时,指定不需要同步的多个database,以','分割。db名称是大小写敏感的。 | +| `test_connection` | 否 | "true" | 是否在创建 Catalog 时测试连接。如果设置为 `true`,则会在创建 Catalog 时测试连接,如果连接失败,则会拒绝创建 Catalog。如果设置为 `false`,则不会测试连接。 | ### 驱动包路径 diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java index b24002b18a..a1330b8df1 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/BaseJdbcExecutor.java @@ -150,6 +150,26 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor { return false; } + public void cleanDataSource() { + if (druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; + } + } + + public void testConnection() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + if (!resultSet.next()) { + throw new UdfRuntimeException( + "Failed to test connection in BE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new UdfRuntimeException("Failed to test connection in BE: ", e); + } + } + public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); diff --git a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java index 94d4304db3..aaa13a0f2d 100644 --- a/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java +++ b/fe/be-java-extensions/jdbc-scanner/src/main/java/org/apache/doris/jdbc/DefaultJdbcExecutor.java @@ -184,6 +184,26 @@ public class DefaultJdbcExecutor { return false; } + public void cleanDataSource() { + if (druidDataSource != null) { + druidDataSource.close(); + JdbcDataSource.getDataSource().getSourcesMap().remove(config.createCacheKey()); + druidDataSource = null; + } + } + + public void testConnection() throws UdfRuntimeException { + try { + resultSet = ((PreparedStatement) stmt).executeQuery(); + if (!resultSet.next()) { + throw new UdfRuntimeException( + "Failed to test connection in BE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new UdfRuntimeException("Failed to test connection in BE: ", e); + } + } + public int read() throws UdfRuntimeException { try { resultSet = ((PreparedStatement) stmt).executeQuery(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java index ab6dd7d41f..65bf7d308f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/JdbcResource.java @@ -105,6 +105,8 @@ public class JdbcResource extends Resource { public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive"; public static final String CHECK_SUM = "checksum"; public static final String CREATE_TIME = "create_time"; + public static final String TEST_CONNECTION = "test_connection"; + private static final ImmutableList ALL_PROPERTIES = new ImmutableList.Builder().add( JDBC_URL, USER, @@ -122,7 +124,8 @@ public class JdbcResource extends Resource { CONNECTION_POOL_MAX_SIZE, CONNECTION_POOL_MAX_LIFE_TIME, CONNECTION_POOL_MAX_WAIT_TIME, - CONNECTION_POOL_KEEP_ALIVE + CONNECTION_POOL_KEEP_ALIVE, + TEST_CONNECTION ).build(); private static final ImmutableList OPTIONAL_PROPERTIES = new ImmutableList.Builder().add( ONLY_SPECIFIED_DATABASE, @@ -134,7 +137,8 @@ public class JdbcResource extends Resource { CONNECTION_POOL_MAX_SIZE, CONNECTION_POOL_MAX_LIFE_TIME, CONNECTION_POOL_MAX_WAIT_TIME, - CONNECTION_POOL_KEEP_ALIVE + CONNECTION_POOL_KEEP_ALIVE, + TEST_CONNECTION ).build(); // The default value of optional properties @@ -152,6 +156,7 @@ public class JdbcResource extends Resource { OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000"); OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false"); + OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true"); } // timeout for both connection and read. 10 seconds is long enough. diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java index 93ebb1b213..0edabdaca0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/JdbcExternalCatalog.java @@ -17,7 +17,10 @@ package org.apache.doris.datasource.jdbc; +import org.apache.doris.catalog.Env; import org.apache.doris.catalog.JdbcResource; +import org.apache.doris.catalog.JdbcTable; +import org.apache.doris.catalog.TableIf.TableType; import org.apache.doris.common.DdlException; import org.apache.doris.datasource.CatalogProperty; import org.apache.doris.datasource.ExternalCatalog; @@ -25,18 +28,32 @@ import org.apache.doris.datasource.InitCatalogLog; import org.apache.doris.datasource.SessionContext; import org.apache.doris.datasource.jdbc.client.JdbcClient; import org.apache.doris.datasource.jdbc.client.JdbcClientConfig; +import org.apache.doris.datasource.jdbc.client.JdbcClientException; +import org.apache.doris.proto.InternalService; +import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest; +import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult; +import org.apache.doris.rpc.BackendServiceProxy; +import org.apache.doris.rpc.RpcException; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TStatusCode; import com.google.common.base.Strings; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.TSerializer; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; @Getter public class JdbcExternalCatalog extends ExternalCatalog { @@ -57,6 +74,7 @@ public class JdbcExternalCatalog extends ExternalCatalog { throws DdlException { super(catalogId, name, InitCatalogLog.Type.JDBC, comment); this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props, isReplay)); + testJdbcConnection(isReplay); } @Override @@ -74,6 +92,9 @@ public class JdbcExternalCatalog extends ExternalCatalog { JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase()); JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_META_NAMES, getLowerCaseMetaNames()); + JdbcResource.checkBooleanProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE, + String.valueOf(isConnectionPoolKeepAlive())); + JdbcResource.checkBooleanProperty(JdbcResource.TEST_CONNECTION, String.valueOf(isTestConnection())); JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(), getExcludeDatabaseMap()); JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(), @@ -190,6 +211,10 @@ public class JdbcExternalCatalog extends ExternalCatalog { .getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE))); } + public boolean isTestConnection() { + return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.TEST_CONNECTION, "false")); + } + @Override protected void initLocalObjectsImpl() { JdbcClientConfig jdbcClientConfig = new JdbcClientConfig() @@ -268,4 +293,78 @@ public class JdbcExternalCatalog extends ExternalCatalog { makeSureInitialized(); jdbcClient.executeStmt(stmt); } + + private void testJdbcConnection(boolean isReplay) throws DdlException { + if (!isReplay) { + if (isTestConnection()) { + try { + initLocalObjectsImpl(); + testFeToJdbcConnection(); + testBeToJdbcConnection(); + } finally { + jdbcClient.closeClient(); + jdbcClient = null; + } + } + } + } + + private void testFeToJdbcConnection() throws DdlException { + try { + jdbcClient.testConnection(); + } catch (JdbcClientException e) { + String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage(); + LOG.error(errorMessage, e); + throw new DdlException(errorMessage, e); + } + } + + private void testBeToJdbcConnection() throws DdlException { + Backend aliveBe = null; + for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) { + if (be.isAlive()) { + aliveBe = be; + } + } + if (aliveBe == null) { + throw new DdlException("Test BE Connection to JDBC Failed: No Alive backends"); + } + TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort()); + try { + JdbcTable jdbcTable = getTestConnectionJdbcTable(); + PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder() + .setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(jdbcTable.toThrift()))) + .setJdbcTableType(jdbcTable.getJdbcTableType().getValue()) + .setQueryStr(jdbcClient.getTestQuery()).build(); + InternalService.PJdbcTestConnectionResult result = null; + Future future = BackendServiceProxy.getInstance() + .testJdbcConnection(address, request); + result = future.get(); + TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode()); + if (code != TStatusCode.OK) { + throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0)); + } + } catch (TException | RpcException | ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + private JdbcTable getTestConnectionJdbcTable() throws DdlException { + JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(), + TableType.JDBC_EXTERNAL_TABLE); + jdbcTable.setCatalogId(this.getId()); + jdbcTable.setJdbcTypeName(this.getDatabaseTypeName()); + jdbcTable.setJdbcUrl(this.getJdbcUrl()); + jdbcTable.setJdbcUser(this.getJdbcUser()); + jdbcTable.setJdbcPasswd(this.getJdbcPasswd()); + jdbcTable.setDriverClass(this.getDriverClass()); + jdbcTable.setDriverUrl(this.getDriverUrl()); + jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl())); + jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize()); + jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize()); + jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime()); + jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime()); + jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive()); + return jdbcTable; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java index 98ed34204e..a211c0d0e3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcClient.java @@ -453,4 +453,22 @@ public abstract class JdbcClient { } return ScalarType.createStringType(); } + + public void testConnection() { + String testQuery = getTestQuery(); + try (Connection conn = getConnection(); + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery(testQuery)) { + if (!rs.next()) { + throw new JdbcClientException( + "Failed to test connection in FE: query executed but returned no results."); + } + } catch (SQLException e) { + throw new JdbcClientException("Failed to test connection in FE: " + e.getMessage(), e); + } + } + + public String getTestQuery() { + return "select 1"; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java index 4d536a4ef3..4afec31c9d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcOracleClient.java @@ -45,6 +45,11 @@ public class JdbcOracleClient extends JdbcClient { return conn.getCatalog(); } + @Override + public String getTestQuery() { + return "SELECT 1 FROM dual"; + } + @Override public List getDatabaseNameList() { Connection conn = getConnection(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java index 4e29bba7d7..1d5f590ed4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/client/JdbcSapHanaClient.java @@ -36,6 +36,11 @@ public class JdbcSapHanaClient extends JdbcClient { return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"}; } + @Override + public String getTestQuery() { + return "SELECT 1 FROM DUMMY"; + } + @Override protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) { String hanaType = fieldSchema.getDataTypeName(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java index f836375573..50c24a4233 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java @@ -110,6 +110,11 @@ public class BackendServiceClient { return stub.fetchTableSchema(request); } + public Future testJdbcConnection( + InternalService.PJdbcTestConnectionRequest request) { + return stub.testJdbcConnection(request); + } + public Future updateCache(InternalService.PUpdateCacheRequest request) { return stub.updateCache(request); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java index df9a90433d..d78e055a1a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java +++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java @@ -320,6 +320,18 @@ public class BackendServiceProxy { } } + public Future testJdbcConnection( + TNetworkAddress address, InternalService.PJdbcTestConnectionRequest request) throws RpcException { + try { + final BackendServiceClient client = getProxy(address); + return client.testJdbcConnection(request); + } catch (Throwable e) { + LOG.warn("test jdbc connection catch a exception, address={}:{}", + address.getHostname(), address.getPort(), e); + throw new RpcException(address.hostname, e.getMessage()); + } + } + public Future reportStreamLoadStatus( TNetworkAddress address, InternalService.PReportStreamLoadStatusRequest request) throws RpcException { try { diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 8b9cce2acb..8b4795c4b7 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -642,6 +642,16 @@ message PFetchTableSchemaResult { repeated PTypeDesc column_types = 4; } +message PJdbcTestConnectionRequest { + optional bytes jdbc_table = 1; + optional int32 jdbc_table_type = 2; + optional string query_str = 3; +} + +message PJdbcTestConnectionResult { + optional PStatus status = 1; +} + message PRowLocation { optional int64 tablet_id = 1; optional string rowset_id = 2; @@ -879,5 +889,6 @@ service PBackendService { rpc get_wal_queue_size(PGetWalQueueSizeRequest) returns(PGetWalQueueSizeResponse); rpc fetch_arrow_flight_schema(PFetchArrowFlightSchemaRequest) returns (PFetchArrowFlightSchemaResult); rpc fetch_remote_tablet_schema(PFetchRemoteSchemaRequest) returns (PFetchRemoteSchemaResponse); + rpc test_jdbc_connection(PJdbcTestConnectionRequest) returns (PJdbcTestConnectionResult); }; diff --git a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out index 4eba3e4364..73dc89da69 100644 Binary files a/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out and b/regression-test/data/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.out differ diff --git a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy index 4b8aee3abe..bc67d89ff4 100644 --- a/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy +++ b/regression-test/suites/external_table_p0/jdbc/test_clickhouse_jdbc_catalog.groovy @@ -101,6 +101,32 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex order_qt_dt_with_tz """ select * from dt_with_tz order by id; """ + sql """create catalog if not exists clickhouse_catalog_test_conn_correct properties( + "type"="jdbc", + "user"="default", + "password"="123456", + "jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.clickhouse.jdbc.ClickHouseDriver", + "test_connection" = "true" + ); + """ + order_qt_test_conn_correct """ select * from clickhouse_catalog_test_conn_correct.doris_test.type; """ + + test { + sql """create catalog if not exists clickhouse_catalog_test_conn_mistake properties( + "type"="jdbc", + "user"="default", + "password"="1234567", + "jdbc_url" = "jdbc:clickhouse://${externalEnvIp}:${clickhouse_port}/doris_test", + "driver_url" = "${driver_url}", + "driver_class" = "com.clickhouse.jdbc.ClickHouseDriver", + "test_connection" = "true" + ); + """ + exception "Test FE Connection to JDBC Failed: Can not connect to jdbc due to error: Code: 516. DB::Exception: default: Authentication failed: password is incorrect, or there is no user with such name." + } + }finally { res_dbs_log = sql "show databases;" for(int i = 0;i < res_dbs_log.size();i++) { @@ -108,7 +134,9 @@ suite("test_clickhouse_jdbc_catalog", "p0,external,clickhouse,external_docker,ex log.info( "database = ${res_dbs_log[i][0]} => tables = "+tbs.toString()) } } - + sql """ drop catalog if exists ${catalog_name} """ + sql """ drop catalog if exists clickhouse_catalog_test_conn_correct """ + sql """ drop catalog if exists clickhouse_catalog_test_conn_mistake """ } }