[Enhancement](jdbc catalog) Add a property to test the connection when creating a Jdbc catalog (#32125) (#32531)

This commit is contained in:
yiguolei
2024-03-21 14:05:59 +08:00
parent 27973b6999
commit 85b2c42f76
18 changed files with 331 additions and 5 deletions

View File

@ -105,6 +105,8 @@ public class JdbcResource extends Resource {
public static final String CONNECTION_POOL_KEEP_ALIVE = "connection_pool_keep_alive";
public static final String CHECK_SUM = "checksum";
public static final String CREATE_TIME = "create_time";
public static final String TEST_CONNECTION = "test_connection";
private static final ImmutableList<String> ALL_PROPERTIES = new ImmutableList.Builder<String>().add(
JDBC_URL,
USER,
@ -122,7 +124,8 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
CONNECTION_POOL_KEEP_ALIVE
CONNECTION_POOL_KEEP_ALIVE,
TEST_CONNECTION
).build();
private static final ImmutableList<String> OPTIONAL_PROPERTIES = new ImmutableList.Builder<String>().add(
ONLY_SPECIFIED_DATABASE,
@ -134,7 +137,8 @@ public class JdbcResource extends Resource {
CONNECTION_POOL_MAX_SIZE,
CONNECTION_POOL_MAX_LIFE_TIME,
CONNECTION_POOL_MAX_WAIT_TIME,
CONNECTION_POOL_KEEP_ALIVE
CONNECTION_POOL_KEEP_ALIVE,
TEST_CONNECTION
).build();
// The default value of optional properties
@ -152,6 +156,7 @@ public class JdbcResource extends Resource {
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_LIFE_TIME, "1800000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_MAX_WAIT_TIME, "5000");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(CONNECTION_POOL_KEEP_ALIVE, "false");
OPTIONAL_PROPERTIES_DEFAULT_VALUE.put(TEST_CONNECTION, "true");
}
// timeout for both connection and read. 10 seconds is long enough.

View File

@ -17,7 +17,10 @@
package org.apache.doris.datasource.jdbc;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
@ -25,18 +28,32 @@ import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.datasource.jdbc.client.JdbcClientConfig;
import org.apache.doris.datasource.jdbc.client.JdbcClientException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionRequest;
import org.apache.doris.proto.InternalService.PJdbcTestConnectionResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Getter
public class JdbcExternalCatalog extends ExternalCatalog {
@ -57,6 +74,7 @@ public class JdbcExternalCatalog extends ExternalCatalog {
throws DdlException {
super(catalogId, name, InitCatalogLog.Type.JDBC, comment);
this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props, isReplay));
testJdbcConnection(isReplay);
}
@Override
@ -74,6 +92,9 @@ public class JdbcExternalCatalog extends ExternalCatalog {
JdbcResource.checkBooleanProperty(JdbcResource.ONLY_SPECIFIED_DATABASE, getOnlySpecifiedDatabase());
JdbcResource.checkBooleanProperty(JdbcResource.LOWER_CASE_META_NAMES, getLowerCaseMetaNames());
JdbcResource.checkBooleanProperty(JdbcResource.CONNECTION_POOL_KEEP_ALIVE,
String.valueOf(isConnectionPoolKeepAlive()));
JdbcResource.checkBooleanProperty(JdbcResource.TEST_CONNECTION, String.valueOf(isTestConnection()));
JdbcResource.checkDatabaseListProperties(getOnlySpecifiedDatabase(), getIncludeDatabaseMap(),
getExcludeDatabaseMap());
JdbcResource.checkConnectionPoolProperties(getConnectionPoolMinSize(), getConnectionPoolMaxSize(),
@ -190,6 +211,10 @@ public class JdbcExternalCatalog extends ExternalCatalog {
.getDefaultPropertyValue(JdbcResource.CONNECTION_POOL_KEEP_ALIVE)));
}
public boolean isTestConnection() {
return Boolean.parseBoolean(catalogProperty.getOrDefault(JdbcResource.TEST_CONNECTION, "false"));
}
@Override
protected void initLocalObjectsImpl() {
JdbcClientConfig jdbcClientConfig = new JdbcClientConfig()
@ -268,4 +293,78 @@ public class JdbcExternalCatalog extends ExternalCatalog {
makeSureInitialized();
jdbcClient.executeStmt(stmt);
}
private void testJdbcConnection(boolean isReplay) throws DdlException {
if (!isReplay) {
if (isTestConnection()) {
try {
initLocalObjectsImpl();
testFeToJdbcConnection();
testBeToJdbcConnection();
} finally {
jdbcClient.closeClient();
jdbcClient = null;
}
}
}
}
private void testFeToJdbcConnection() throws DdlException {
try {
jdbcClient.testConnection();
} catch (JdbcClientException e) {
String errorMessage = "Test FE Connection to JDBC Failed: " + e.getMessage();
LOG.error(errorMessage, e);
throw new DdlException(errorMessage, e);
}
}
private void testBeToJdbcConnection() throws DdlException {
Backend aliveBe = null;
for (Backend be : Env.getCurrentSystemInfo().getIdToBackend().values()) {
if (be.isAlive()) {
aliveBe = be;
}
}
if (aliveBe == null) {
throw new DdlException("Test BE Connection to JDBC Failed: No Alive backends");
}
TNetworkAddress address = new TNetworkAddress(aliveBe.getHost(), aliveBe.getBrpcPort());
try {
JdbcTable jdbcTable = getTestConnectionJdbcTable();
PJdbcTestConnectionRequest request = InternalService.PJdbcTestConnectionRequest.newBuilder()
.setJdbcTable(ByteString.copyFrom(new TSerializer().serialize(jdbcTable.toThrift())))
.setJdbcTableType(jdbcTable.getJdbcTableType().getValue())
.setQueryStr(jdbcClient.getTestQuery()).build();
InternalService.PJdbcTestConnectionResult result = null;
Future<PJdbcTestConnectionResult> future = BackendServiceProxy.getInstance()
.testJdbcConnection(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new DdlException("Test BE Connection to JDBC Failed: " + result.getStatus().getErrorMsgs(0));
}
} catch (TException | RpcException | ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
}
private JdbcTable getTestConnectionJdbcTable() throws DdlException {
JdbcTable jdbcTable = new JdbcTable(0, "test_jdbc_connection", Lists.newArrayList(),
TableType.JDBC_EXTERNAL_TABLE);
jdbcTable.setCatalogId(this.getId());
jdbcTable.setJdbcTypeName(this.getDatabaseTypeName());
jdbcTable.setJdbcUrl(this.getJdbcUrl());
jdbcTable.setJdbcUser(this.getJdbcUser());
jdbcTable.setJdbcPasswd(this.getJdbcPasswd());
jdbcTable.setDriverClass(this.getDriverClass());
jdbcTable.setDriverUrl(this.getDriverUrl());
jdbcTable.setCheckSum(JdbcResource.computeObjectChecksum(this.getDriverUrl()));
jdbcTable.setConnectionPoolMinSize(this.getConnectionPoolMinSize());
jdbcTable.setConnectionPoolMaxSize(this.getConnectionPoolMaxSize());
jdbcTable.setConnectionPoolMaxLifeTime(this.getConnectionPoolMaxLifeTime());
jdbcTable.setConnectionPoolMaxWaitTime(this.getConnectionPoolMaxWaitTime());
jdbcTable.setConnectionPoolKeepAlive(this.isConnectionPoolKeepAlive());
return jdbcTable;
}
}

View File

@ -453,4 +453,22 @@ public abstract class JdbcClient {
}
return ScalarType.createStringType();
}
public void testConnection() {
String testQuery = getTestQuery();
try (Connection conn = getConnection();
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(testQuery)) {
if (!rs.next()) {
throw new JdbcClientException(
"Failed to test connection in FE: query executed but returned no results.");
}
} catch (SQLException e) {
throw new JdbcClientException("Failed to test connection in FE: " + e.getMessage(), e);
}
}
public String getTestQuery() {
return "select 1";
}
}

View File

@ -45,6 +45,11 @@ public class JdbcOracleClient extends JdbcClient {
return conn.getCatalog();
}
@Override
public String getTestQuery() {
return "SELECT 1 FROM dual";
}
@Override
public List<String> getDatabaseNameList() {
Connection conn = getConnection();

View File

@ -36,6 +36,11 @@ public class JdbcSapHanaClient extends JdbcClient {
return new String[] {"TABLE", "VIEW", "OLAP VIEW", "JOIN VIEW", "HIERARCHY VIEW", "CALC VIEW"};
}
@Override
public String getTestQuery() {
return "SELECT 1 FROM DUMMY";
}
@Override
protected Type jdbcTypeToDoris(JdbcFieldSchema fieldSchema) {
String hanaType = fieldSchema.getDataTypeName();

View File

@ -110,6 +110,11 @@ public class BackendServiceClient {
return stub.fetchTableSchema(request);
}
public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection(
InternalService.PJdbcTestConnectionRequest request) {
return stub.testJdbcConnection(request);
}
public Future<InternalService.PCacheResponse> updateCache(InternalService.PUpdateCacheRequest request) {
return stub.updateCache(request);
}

View File

@ -320,6 +320,18 @@ public class BackendServiceProxy {
}
}
public Future<InternalService.PJdbcTestConnectionResult> testJdbcConnection(
TNetworkAddress address, InternalService.PJdbcTestConnectionRequest request) throws RpcException {
try {
final BackendServiceClient client = getProxy(address);
return client.testJdbcConnection(request);
} catch (Throwable e) {
LOG.warn("test jdbc connection catch a exception, address={}:{}",
address.getHostname(), address.getPort(), e);
throw new RpcException(address.hostname, e.getMessage());
}
}
public Future<InternalService.PReportStreamLoadStatusResponse> reportStreamLoadStatus(
TNetworkAddress address, InternalService.PReportStreamLoadStatusRequest request) throws RpcException {
try {