[improvement](jdbc) add profile for jdbc read and convert phase (#23962)

Add 2 metrics in jdbc scan node profile:
- `CallJniNextTime`: call get next from jdbc result set
- `ConvertBatchTime`: call convert jobject to columm block

Also fix a potential concurrency issue when init jdbc connection cache pool
This commit is contained in:
Mingyu Chen
2023-09-10 21:42:06 +08:00
committed by GitHub
parent ff92b5bc29
commit f85da7d942
4 changed files with 45 additions and 21 deletions

View File

@ -50,6 +50,10 @@ NewJdbcScanner::NewJdbcScanner(RuntimeState* state, NewJdbcScanNode* parent, int
_init_connector_timer = ADD_TIMER(get_parent()->_scanner_profile, "InitConnectorTime");
_check_type_timer = ADD_TIMER(get_parent()->_scanner_profile, "CheckTypeTime");
_get_data_timer = ADD_TIMER(get_parent()->_scanner_profile, "GetDataTime");
_call_jni_next_timer =
ADD_CHILD_TIMER(get_parent()->_scanner_profile, "CallJniNextTime", "GetDataTime");
_convert_batch_timer =
ADD_CHILD_TIMER(get_parent()->_scanner_profile, "ConvertBatchTime", "GetDataTime");
_execte_read_timer = ADD_TIMER(get_parent()->_scanner_profile, "ExecteReadTime");
_connector_close_timer = ADD_TIMER(get_parent()->_scanner_profile, "ConnectorCloseTime");
}
@ -186,6 +190,8 @@ void NewJdbcScanner::_update_profile() {
COUNTER_UPDATE(_init_connector_timer, jdbc_statistic._init_connector_timer);
COUNTER_UPDATE(_check_type_timer, jdbc_statistic._check_type_timer);
COUNTER_UPDATE(_get_data_timer, jdbc_statistic._get_data_timer);
COUNTER_UPDATE(_call_jni_next_timer, jdbc_statistic._call_jni_next_timer);
COUNTER_UPDATE(_convert_batch_timer, jdbc_statistic._convert_batch_timer);
COUNTER_UPDATE(_execte_read_timer, jdbc_statistic._execte_read_timer);
COUNTER_UPDATE(_connector_close_timer, jdbc_statistic._connector_close_timer);
}

View File

@ -60,6 +60,8 @@ protected:
RuntimeProfile::Counter* _load_jar_timer = nullptr;
RuntimeProfile::Counter* _init_connector_timer = nullptr;
RuntimeProfile::Counter* _get_data_timer = nullptr;
RuntimeProfile::Counter* _call_jni_next_timer = nullptr;
RuntimeProfile::Counter* _convert_batch_timer = nullptr;
RuntimeProfile::Counter* _check_type_timer = nullptr;
RuntimeProfile::Counter* _execte_read_timer = nullptr;
RuntimeProfile::Counter* _connector_close_timer = nullptr;

View File

@ -64,6 +64,8 @@ public:
int64_t _load_jar_timer = 0;
int64_t _init_connector_timer = 0;
int64_t _get_data_timer = 0;
int64_t _call_jni_next_timer = 0;
int64_t _convert_batch_timer = 0;
int64_t _check_type_timer = 0;
int64_t _execte_read_timer = 0;
int64_t _connector_close_timer = 0;

View File

@ -85,6 +85,7 @@ public class JdbcExecutor {
private int curBlockRows = 0;
private static final byte[] emptyBytes = new byte[0];
private DruidDataSource druidDataSource = null;
private byte[] druidDataSourceLock = new byte[0];
private int minPoolSize;
private int maxPoolSize;
private int minIdleSize;
@ -410,29 +411,41 @@ public class JdbcExecutor {
ClassLoader classLoader = UdfUtils.getClassLoader(driverUrl, parent);
druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl + jdbcUser + jdbcPassword);
if (druidDataSource == null) {
DruidDataSource ds = new DruidDataSource();
ds.setDriverClassLoader(classLoader);
ds.setDriverClassName(driverClass);
ds.setUrl(jdbcUrl);
ds.setUsername(jdbcUser);
ds.setPassword(jdbcPassword);
ds.setMinIdle(minIdleSize);
ds.setInitialSize(minPoolSize);
ds.setMaxActive(maxPoolSize);
ds.setMaxWait(maxWaitTime);
ds.setTestWhileIdle(true);
ds.setTestOnBorrow(false);
setValidationQuery(ds, tableType);
ds.setTimeBetweenEvictionRunsMillis(maxIdleTime / 5);
ds.setMinEvictableIdleTimeMillis(maxIdleTime);
druidDataSource = ds;
// here is a cache of datasource, which using the string(jdbcUrl + jdbcUser +
// jdbcPassword) as key.
// and the default datasource init = 1, min = 1, max = 100, if one of connection idle
// time greater than 10 minutes. then connection will be retrieved.
JdbcDataSource.getDataSource().putSource(jdbcUrl + jdbcUser + jdbcPassword, ds);
synchronized (druidDataSourceLock) {
druidDataSource = JdbcDataSource.getDataSource().getSource(jdbcUrl + jdbcUser + jdbcPassword);
if (druidDataSource == null) {
long start = System.currentTimeMillis();
DruidDataSource ds = new DruidDataSource();
ds.setDriverClassLoader(classLoader);
ds.setDriverClassName(driverClass);
ds.setUrl(jdbcUrl);
ds.setUsername(jdbcUser);
ds.setPassword(jdbcPassword);
ds.setMinIdle(minIdleSize);
ds.setInitialSize(minPoolSize);
ds.setMaxActive(maxPoolSize);
ds.setMaxWait(maxWaitTime);
ds.setTestWhileIdle(true);
ds.setTestOnBorrow(false);
setValidationQuery(ds, tableType);
ds.setTimeBetweenEvictionRunsMillis(maxIdleTime / 5);
ds.setMinEvictableIdleTimeMillis(maxIdleTime);
druidDataSource = ds;
// here is a cache of datasource, which using the string(jdbcUrl + jdbcUser +
// jdbcPassword) as key.
// and the default datasource init = 1, min = 1, max = 100, if one of connection idle
// time greater than 10 minutes. then connection will be retrieved.
JdbcDataSource.getDataSource().putSource(jdbcUrl + jdbcUser + jdbcPassword, ds);
LOG.info("init datasource [" + (jdbcUrl + jdbcUser) + "] cost: " + (
System.currentTimeMillis() - start) + " ms");
}
}
}
long start = System.currentTimeMillis();
conn = druidDataSource.getConnection();
LOG.info("get connection [" + (jdbcUrl + jdbcUser) + "] cost: " + (System.currentTimeMillis() - start)
+ " ms");
if (op == TJdbcOperation.READ) {
conn.setAutoCommit(false);
Preconditions.checkArgument(sql != null);
@ -2140,3 +2153,4 @@ public class JdbcExecutor {
return i;
}
}