[branch-2.1][improvement](jdbc catalog) Change JdbcExecutor's error reporting from UDF to JDBC (#37635)

pick (#35692)

In the initial version, JdbcExecutor directly used UdfRuntimeException,
which could lead to misunderstanding of the exception. Therefore, I
created a separate Exception for JdbcExecutor to help us view the
exception more clearly.
This commit is contained in:
zy-kkk
2024-07-11 15:11:41 +08:00
committed by GitHub
parent ef754487d9
commit 39ded1f649
3 changed files with 59 additions and 33 deletions

View File

@ -18,7 +18,6 @@
package org.apache.doris.jdbc;
import org.apache.doris.common.exception.InternalException;
import org.apache.doris.common.exception.UdfRuntimeException;
import org.apache.doris.common.jni.utils.UdfUtils;
import org.apache.doris.common.jni.vec.ColumnType;
import org.apache.doris.common.jni.vec.ColumnValueConverter;
@ -154,19 +153,19 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor {
}
}
public void testConnection() throws UdfRuntimeException {
public void testConnection() throws JdbcExecutorException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
if (!resultSet.next()) {
throw new UdfRuntimeException(
throw new JdbcExecutorException(
"Failed to test connection in BE: query executed but returned no results.");
}
} catch (SQLException e) {
throw new UdfRuntimeException("Failed to test connection in BE: ", e);
throw new JdbcExecutorException("Failed to test connection in BE: ", e);
}
}
public int read() throws UdfRuntimeException {
public int read() throws JdbcExecutorException {
try {
resultSet = ((PreparedStatement) stmt).executeQuery();
resultSetMetaData = resultSet.getMetaData();
@ -174,11 +173,11 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor {
block = new ArrayList<>(columnCount);
return columnCount;
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
throw new JdbcExecutorException("JDBC executor sql has error: ", e);
}
}
public long getBlockAddress(int batchSize, Map<String, String> outputParams) throws UdfRuntimeException {
public long getBlockAddress(int batchSize, Map<String, String> outputParams) throws JdbcExecutorException {
try {
if (outputTable != null) {
outputTable.close();
@ -220,7 +219,7 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor {
}
} catch (Exception e) {
LOG.warn("jdbc get block address exception: ", e);
throw new UdfRuntimeException("jdbc get block address: ", e);
throw new JdbcExecutorException("jdbc get block address: ", e);
} finally {
block.clear();
}
@ -234,44 +233,44 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor {
}
}
public int write(Map<String, String> params) throws UdfRuntimeException {
public int write(Map<String, String> params) throws JdbcExecutorException {
VectorTable batchTable = VectorTable.createReadableTable(params);
// Can't release or close batchTable, it's released by c++
try {
insert(batchTable);
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor sql has error: ", e);
throw new JdbcExecutorException("JDBC executor sql has error: ", e);
}
return batchTable.getNumRows();
}
public void openTrans() throws UdfRuntimeException {
public void openTrans() throws JdbcExecutorException {
try {
if (conn != null) {
conn.setAutoCommit(false);
}
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor open transaction has error: ", e);
throw new JdbcExecutorException("JDBC executor open transaction has error: ", e);
}
}
public void commitTrans() throws UdfRuntimeException {
public void commitTrans() throws JdbcExecutorException {
try {
if (conn != null) {
conn.commit();
}
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor commit transaction has error: ", e);
throw new JdbcExecutorException("JDBC executor commit transaction has error: ", e);
}
}
public void rollbackTrans() throws UdfRuntimeException {
public void rollbackTrans() throws JdbcExecutorException {
try {
if (conn != null) {
conn.rollback();
}
} catch (SQLException e) {
throw new UdfRuntimeException("JDBC executor rollback transaction has error: ", e);
throw new JdbcExecutorException("JDBC executor rollback transaction has error: ", e);
}
}
@ -279,18 +278,18 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor {
return curBlockRows;
}
public boolean hasNext() throws UdfRuntimeException {
public boolean hasNext() throws JdbcExecutorException {
try {
if (resultSet == null) {
return false;
}
return resultSet.next();
} catch (SQLException e) {
throw new UdfRuntimeException("resultSet to get next error: ", e);
throw new JdbcExecutorException("resultSet to get next error: ", e);
}
}
private void init(JdbcDataSourceConfig config, String sql) throws UdfRuntimeException {
private void init(JdbcDataSourceConfig config, String sql) throws JdbcExecutorException {
ClassLoader oldClassLoader = Thread.currentThread().getContextClassLoader();
String hikariDataSourceKey = config.createCacheKey();
try {
@ -340,13 +339,14 @@ public abstract class BaseJdbcExecutor implements JdbcExecutor {
initializeStatement(conn, config, sql);
} catch (MalformedURLException e) {
throw new UdfRuntimeException("MalformedURLException to load class about " + config.getJdbcDriverUrl(), e);
throw new JdbcExecutorException("MalformedURLException to load class about "
+ config.getJdbcDriverUrl(), e);
} catch (SQLException e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
throw new JdbcExecutorException("Initialize datasource failed: ", e);
} catch (FileNotFoundException e) {
throw new UdfRuntimeException("FileNotFoundException failed: ", e);
throw new JdbcExecutorException("FileNotFoundException failed: ", e);
} catch (Exception e) {
throw new UdfRuntimeException("Initialize datasource failed: ", e);
throw new JdbcExecutorException("Initialize datasource failed: ", e);
} finally {
Thread.currentThread().setContextClassLoader(oldClassLoader);
}

View File

@ -17,26 +17,24 @@
package org.apache.doris.jdbc;
import org.apache.doris.common.exception.UdfRuntimeException;
import java.util.Map;
public interface JdbcExecutor {
int read() throws UdfRuntimeException;
int read() throws JdbcExecutorException;
int write(Map<String, String> params) throws UdfRuntimeException;
int write(Map<String, String> params) throws JdbcExecutorException;
long getBlockAddress(int batchSize, Map<String, String> outputParams) throws UdfRuntimeException;
long getBlockAddress(int batchSize, Map<String, String> outputParams) throws JdbcExecutorException;
void close() throws UdfRuntimeException, Exception;
void close() throws JdbcExecutorException, Exception;
void openTrans() throws UdfRuntimeException;
void openTrans() throws JdbcExecutorException;
void commitTrans() throws UdfRuntimeException;
void commitTrans() throws JdbcExecutorException;
void rollbackTrans() throws UdfRuntimeException;
void rollbackTrans() throws JdbcExecutorException;
int getCurBlockRows();
boolean hasNext() throws UdfRuntimeException;
boolean hasNext() throws JdbcExecutorException;
}

View File

@ -0,0 +1,28 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.jdbc;
public class JdbcExecutorException extends Exception {
public JdbcExecutorException(String msg, Throwable cause) {
super(msg, cause);
}
public JdbcExecutorException(String msg) {
super(msg);
}
}