fix reuse and recycle dblink connection bug
This commit is contained in:
@ -102,13 +102,14 @@ public:
|
|||||||
const common::ObString &conn_str,
|
const common::ObString &conn_str,
|
||||||
const common::ObString &cluster_str,
|
const common::ObString &cluster_str,
|
||||||
const dblink_param_ctx ¶m_ctx) = 0;
|
const dblink_param_ctx ¶m_ctx) = 0;
|
||||||
virtual int acquire_dblink(uint64_t dblink_id, const dblink_param_ctx ¶m_ctx,
|
virtual int acquire_dblink(uint64_t tenant_id, uint64_t dblink_id, const dblink_param_ctx ¶m_ctx,
|
||||||
ObISQLConnection *&dblink_conn, uint32_t sessid,
|
ObISQLConnection *&dblink_conn, uint32_t sessid,
|
||||||
int64_t sql_request_level = 0) = 0;
|
int64_t sql_request_level = 0) = 0;
|
||||||
virtual int release_dblink(ObISQLConnection *dblink_conn, uint32_t sessid = 0) = 0;
|
virtual int release_dblink(ObISQLConnection *dblink_conn, uint32_t sessid = 0) = 0;
|
||||||
virtual int do_acquire_dblink(uint64_t dblink_id, const dblink_param_ctx ¶m_ctx,
|
virtual int do_acquire_dblink(uint64_t tenant_id, uint64_t dblink_id, const dblink_param_ctx ¶m_ctx,
|
||||||
ObISQLConnection *&dblink_conn, uint32_t sessid) = 0;
|
ObISQLConnection *&dblink_conn, uint32_t sessid) = 0;
|
||||||
virtual int try_connect_dblink(ObISQLConnection *dblink_conn, int64_t sql_request_level = 0) = 0;
|
virtual int try_connect_dblink(ObISQLConnection *dblink_conn, int64_t sql_request_level = 0) = 0;
|
||||||
|
virtual int clean_dblink_connection(uint64_t tenant_id) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end namespace sqlclient
|
} // end namespace sqlclient
|
||||||
|
|||||||
@ -757,11 +757,11 @@ int ObMySQLConnectionPool::create_dblink_pool(uint64_t tenant_id, uint64_t dblin
|
|||||||
const common::ObString &cluster_str,
|
const common::ObString &cluster_str,
|
||||||
const dblink_param_ctx ¶m_ctx)
|
const dblink_param_ctx ¶m_ctx)
|
||||||
{
|
{
|
||||||
UNUSEDx(tenant_id, param_ctx);
|
UNUSEDx(param_ctx);
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObServerConnectionPool *dblink_pool = NULL;
|
ObServerConnectionPool *dblink_pool = NULL;
|
||||||
|
|
||||||
if (OB_FAIL(get_dblink_pool(dblink_id, dblink_pool))) {
|
if (OB_FAIL(get_dblink_pool(tenant_id, dblink_id, dblink_pool))) {
|
||||||
LOG_WARN("fail to get dblink connection pool", K(dblink_id));
|
LOG_WARN("fail to get dblink connection pool", K(dblink_id));
|
||||||
} else if (OB_NOT_NULL(dblink_pool)) {
|
} else if (OB_NOT_NULL(dblink_pool)) {
|
||||||
// nothing.
|
// nothing.
|
||||||
@ -770,14 +770,14 @@ int ObMySQLConnectionPool::create_dblink_pool(uint64_t tenant_id, uint64_t dblin
|
|||||||
// can not use obsys::ObWLockGuard lock(get_lock_), cause it will have dead lock
|
// can not use obsys::ObWLockGuard lock(get_lock_), cause it will have dead lock
|
||||||
// use a new lock for create_dblink_pool
|
// use a new lock for create_dblink_pool
|
||||||
obsys::ObWLockGuard lock(dblink_pool_lock_);
|
obsys::ObWLockGuard lock(dblink_pool_lock_);
|
||||||
if (OB_FAIL(get_dblink_pool(dblink_id, dblink_pool))) { //get again
|
if (OB_FAIL(get_dblink_pool(tenant_id, dblink_id, dblink_pool))) { //get again
|
||||||
LOG_WARN("fail to get dblink connection pool", K(dblink_id));
|
LOG_WARN("fail to get dblink connection pool", K(dblink_id));
|
||||||
} else if (OB_NOT_NULL(dblink_pool)) {
|
} else if (OB_NOT_NULL(dblink_pool)) {
|
||||||
// nothing.
|
// nothing.
|
||||||
} else if (OB_ISNULL(dblink_pool = server_pool_.alloc())) {
|
} else if (OB_ISNULL(dblink_pool = server_pool_.alloc())) {
|
||||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||||
LOG_ERROR("out of memory", K(ret));
|
LOG_ERROR("out of memory", K(ret));
|
||||||
} else if (OB_FAIL(dblink_pool->init_dblink(dblink_id, server, db_tenant, db_user, db_pass,
|
} else if (OB_FAIL(dblink_pool->init_dblink(tenant_id, dblink_id, server, db_tenant, db_user, db_pass,
|
||||||
db_name, conn_str, cluster_str,
|
db_name, conn_str, cluster_str,
|
||||||
this, config_.sqlclient_per_observer_conn_limit_))) {
|
this, config_.sqlclient_per_observer_conn_limit_))) {
|
||||||
LOG_WARN("fail to init dblink connection pool", K(ret));
|
LOG_WARN("fail to init dblink connection pool", K(ret));
|
||||||
@ -794,10 +794,13 @@ int ObMySQLConnectionPool::create_dblink_pool(uint64_t tenant_id, uint64_t dblin
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObMySQLConnectionPool::acquire_dblink(uint64_t dblink_id, const dblink_param_ctx ¶m_ctx, ObISQLConnection *&dblink_conn, uint32_t sessid, int64_t sql_request_level)
|
int ObMySQLConnectionPool::acquire_dblink(uint64_t tenant_id, uint64_t dblink_id,
|
||||||
|
const dblink_param_ctx ¶m_ctx,
|
||||||
|
ObISQLConnection *&dblink_conn,
|
||||||
|
uint32_t sessid, int64_t sql_request_level)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
if (OB_FAIL(do_acquire_dblink(dblink_id, param_ctx, dblink_conn, sessid))) {
|
if (OB_FAIL(do_acquire_dblink(tenant_id, dblink_id, param_ctx, dblink_conn, sessid))) {
|
||||||
LOG_WARN("fail to acquire dblink", K(ret), K(dblink_id));
|
LOG_WARN("fail to acquire dblink", K(ret), K(dblink_id));
|
||||||
} else if (OB_FAIL(try_connect_dblink(dblink_conn, sql_request_level))) {
|
} else if (OB_FAIL(try_connect_dblink(dblink_conn, sql_request_level))) {
|
||||||
LOG_WARN("fail to try connect dblink", K(ret), K(dblink_id));
|
LOG_WARN("fail to try connect dblink", K(ret), K(dblink_id));
|
||||||
@ -820,7 +823,8 @@ int ObMySQLConnectionPool::release_dblink(ObISQLConnection *dblink_conn, uint32_
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObMySQLConnectionPool::get_dblink_pool(uint64_t dblink_id, ObServerConnectionPool *&dblink_pool)
|
int ObMySQLConnectionPool::get_dblink_pool(uint64_t tenant_id, uint64_t dblink_id,
|
||||||
|
ObServerConnectionPool *&dblink_pool)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObServerConnectionPool *pool = NULL;
|
ObServerConnectionPool *pool = NULL;
|
||||||
@ -830,7 +834,7 @@ int ObMySQLConnectionPool::get_dblink_pool(uint64_t dblink_id, ObServerConnectio
|
|||||||
if (OB_ISNULL(pool = *iter)) {
|
if (OB_ISNULL(pool = *iter)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
LOG_WARN("pool is null", K(ret));
|
LOG_WARN("pool is null", K(ret));
|
||||||
} else if (dblink_id == pool->get_dblink_id()) {
|
} else if (dblink_id == pool->get_dblink_id() && pool->get_tenant_id() == tenant_id) {
|
||||||
dblink_pool = pool;
|
dblink_pool = pool;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -838,13 +842,15 @@ int ObMySQLConnectionPool::get_dblink_pool(uint64_t dblink_id, ObServerConnectio
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObMySQLConnectionPool::do_acquire_dblink(uint64_t dblink_id, const dblink_param_ctx ¶m_ctx, ObISQLConnection *&dblink_conn, uint32_t sessid)
|
int ObMySQLConnectionPool::do_acquire_dblink(uint64_t tenant_id, uint64_t dblink_id,
|
||||||
|
const dblink_param_ctx ¶m_ctx,
|
||||||
|
ObISQLConnection *&dblink_conn, uint32_t sessid)
|
||||||
{
|
{
|
||||||
UNUSED(param_ctx);
|
UNUSED(param_ctx);
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
ObServerConnectionPool *dblink_pool = NULL;
|
ObServerConnectionPool *dblink_pool = NULL;
|
||||||
ObMySQLConnection *dblink_conn1 = NULL;
|
ObMySQLConnection *dblink_conn1 = NULL;
|
||||||
if (OB_FAIL(get_dblink_pool(dblink_id, dblink_pool))) {
|
if (OB_FAIL(get_dblink_pool(tenant_id, dblink_id, dblink_pool))) {
|
||||||
LOG_WARN("failed to get dblink pool", K(ret), K(dblink_id));
|
LOG_WARN("failed to get dblink pool", K(ret), K(dblink_id));
|
||||||
} else if (OB_ISNULL(dblink_pool)) {
|
} else if (OB_ISNULL(dblink_pool)) {
|
||||||
ret = OB_ERR_UNEXPECTED;
|
ret = OB_ERR_UNEXPECTED;
|
||||||
@ -1054,6 +1060,41 @@ int ObMySQLConnectionPool::purge_tenant_server_pool_map_(const ObIArray<uint64_t
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObMySQLConnectionPool::clean_dblink_connection(uint64_t tenant_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
ObServerConnectionPool *pool = NULL;
|
||||||
|
ObArray<ObList<ObServerConnectionPool *, common::ObArenaAllocator>::iterator > to_delete;
|
||||||
|
for (ServerList::iterator iter = server_list_.begin();
|
||||||
|
OB_SUCC(ret) && iter != server_list_.end(); iter++) {
|
||||||
|
if (OB_ISNULL(pool = *iter)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("pool is null", K(ret));
|
||||||
|
} else if (pool->get_tenant_id() != tenant_id) {
|
||||||
|
} else if (0 == pool->get_busy_count()) {
|
||||||
|
pool->set_server_gone(true);
|
||||||
|
LOG_INFO("dblink mysql server pool removed", K(pool->get_server()), K(pool->get_tenant_id()), K(pool->get_dblink_id()));
|
||||||
|
if (OB_FAIL(to_delete.push_back(iter))) {
|
||||||
|
LOG_WARN("push iter to delete list fail", K(ret));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG_ERROR("dblink pool busy count should be zero", K(pool->get_server()),
|
||||||
|
K(pool->get_tenant_id()), K(pool->get_dblink_id()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG_INFO("clean dblink mysql server pool", K(to_delete.count()), K(server_list_.size()));
|
||||||
|
if (OB_SUCC(ret)) {
|
||||||
|
for (int i = 0; OB_SUCC(ret) && i < to_delete.count(); i++) {
|
||||||
|
pool = *to_delete.at(i);
|
||||||
|
server_pool_.free(pool);
|
||||||
|
if (OB_FAIL(server_list_.erase(to_delete.at(i)))) {
|
||||||
|
LOG_WARN("fail to delete pool from server_list", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
bool ObMySQLConnectionPool::TenantServerConnPoolPurger::operator()(
|
bool ObMySQLConnectionPool::TenantServerConnPoolPurger::operator()(
|
||||||
const TenantMapKey &tenant_key,
|
const TenantMapKey &tenant_key,
|
||||||
const ObTenantServerConnectionPool *tenant_server_pool)
|
const ObTenantServerConnectionPool *tenant_server_pool)
|
||||||
@ -1095,6 +1136,7 @@ bool ObMySQLConnectionPool::TenantServerConnPoolPurger::is_tenant_not_serve_(con
|
|||||||
return ! is_serve;
|
return ! is_serve;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
} // end namespace sqlclient
|
} // end namespace sqlclient
|
||||||
} // end namespace common
|
} // end namespace common
|
||||||
} // end namespace oceanbase
|
} // end namespace oceanbase
|
||||||
|
|||||||
@ -162,19 +162,20 @@ public:
|
|||||||
const common::ObString &conn_str,
|
const common::ObString &conn_str,
|
||||||
const common::ObString &cluster_str,
|
const common::ObString &cluster_str,
|
||||||
const dblink_param_ctx ¶m_ctx);
|
const dblink_param_ctx ¶m_ctx);
|
||||||
virtual int acquire_dblink(uint64_t dblink_id,
|
virtual int acquire_dblink(uint64_t tenant_id, uint64_t dblink_id,
|
||||||
const dblink_param_ctx ¶m_ctx,
|
const dblink_param_ctx ¶m_ctx,
|
||||||
ObISQLConnection *&dblink_conn,
|
ObISQLConnection *&dblink_conn,
|
||||||
uint32_t sessid = 0, int64_t
|
uint32_t sessid = 0, int64_t
|
||||||
sql_request_level = 0);
|
sql_request_level = 0);
|
||||||
virtual int release_dblink(ObISQLConnection *dblink_conn, uint32_t sessid = 0);
|
virtual int release_dblink(ObISQLConnection *dblink_conn, uint32_t sessid = 0);
|
||||||
virtual int do_acquire_dblink(uint64_t dblink_id,
|
virtual int do_acquire_dblink(uint64_t tenant_id, uint64_t dblink_id,
|
||||||
const dblink_param_ctx ¶m_ctx,
|
const dblink_param_ctx ¶m_ctx,
|
||||||
ObISQLConnection *&dblink_conn,
|
ObISQLConnection *&dblink_conn,
|
||||||
uint32_t sessid);
|
uint32_t sessid);
|
||||||
virtual int try_connect_dblink(ObISQLConnection *dblink_conn, int64_t sql_request_level = 0);
|
virtual int try_connect_dblink(ObISQLConnection *dblink_conn, int64_t sql_request_level = 0);
|
||||||
int get_dblink_pool(uint64_t dblink_id, ObServerConnectionPool *&dblink_pool);
|
int get_dblink_pool(uint64_t tenant_id, uint64_t dblink_id, ObServerConnectionPool *&dblink_pool);
|
||||||
void set_check_read_consistency(bool need_check) { check_read_consistency_ = need_check; }
|
void set_check_read_consistency(bool need_check) { check_read_consistency_ = need_check; }
|
||||||
|
virtual int clean_dblink_connection(uint64_t tenant_id);
|
||||||
protected:
|
protected:
|
||||||
// update interval.
|
// update interval.
|
||||||
// update ms list in backgroud thread and
|
// update ms list in backgroud thread and
|
||||||
|
|||||||
@ -446,7 +446,7 @@ int ObDbLinkProxy::create_dblink_pool(uint64_t tenant_id, uint64_t dblink_id, Db
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObDbLinkProxy::acquire_dblink(uint64_t dblink_id,
|
int ObDbLinkProxy::acquire_dblink(uint64 tenant_id, uint64_t dblink_id,
|
||||||
DblinkDriverProto dblink_type,
|
DblinkDriverProto dblink_type,
|
||||||
const dblink_param_ctx ¶m_ctx,
|
const dblink_param_ctx ¶m_ctx,
|
||||||
ObISQLConnection *&dblink_conn,
|
ObISQLConnection *&dblink_conn,
|
||||||
@ -461,7 +461,8 @@ int ObDbLinkProxy::acquire_dblink(uint64_t dblink_id,
|
|||||||
LOG_WARN("dblink proxy not inited");
|
LOG_WARN("dblink proxy not inited");
|
||||||
} else if (OB_FAIL(switch_dblink_conn_pool(dblink_type, dblink_pool))) {
|
} else if (OB_FAIL(switch_dblink_conn_pool(dblink_type, dblink_pool))) {
|
||||||
LOG_WARN("failed to get dblink interface", K(ret), K(dblink_type));
|
LOG_WARN("failed to get dblink interface", K(ret), K(dblink_type));
|
||||||
} else if (OB_FAIL(dblink_pool->acquire_dblink(dblink_id, param_ctx, dblink_conn, sessid, sql_request_level))) {
|
} else if (OB_FAIL(dblink_pool->acquire_dblink(tenant_id, dblink_id, param_ctx, dblink_conn,
|
||||||
|
sessid, sql_request_level))) {
|
||||||
LOG_WARN("acquire dblink failed", K(ret), K(dblink_id), K(dblink_type));
|
LOG_WARN("acquire dblink failed", K(ret), K(dblink_id), K(dblink_type));
|
||||||
} else if (OB_FAIL(prepare_enviroment(dblink_conn, dblink_type, set_sql_mode_cstr))) {
|
} else if (OB_FAIL(prepare_enviroment(dblink_conn, dblink_type, set_sql_mode_cstr))) {
|
||||||
LOG_WARN("failed to prepare dblink env", K(ret));
|
LOG_WARN("failed to prepare dblink env", K(ret));
|
||||||
@ -596,3 +597,18 @@ int ObDbLinkProxy::rollback(ObISQLConnection *dblink_conn)
|
|||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int ObDbLinkProxy::clean_dblink_connection(uint64_t tenant_id)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
if (OB_ISNULL(link_pool_)) {
|
||||||
|
ret = OB_ERR_UNEXPECTED;
|
||||||
|
LOG_WARN("failed to switch dblink conn pool", K(ret));
|
||||||
|
} else {
|
||||||
|
if (OB_FAIL(link_pool_->get_mysql_pool().clean_dblink_connection(tenant_id))) {
|
||||||
|
LOG_WARN("clean mysql pool failed", K(ret));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
@ -172,7 +172,7 @@ public:
|
|||||||
const common::ObString &conn_str,
|
const common::ObString &conn_str,
|
||||||
const common::ObString &cluster_str,
|
const common::ObString &cluster_str,
|
||||||
const sqlclient::dblink_param_ctx ¶m_ctx);
|
const sqlclient::dblink_param_ctx ¶m_ctx);
|
||||||
int acquire_dblink(uint64_t dblink_id,
|
int acquire_dblink(uint64 tenant_id, uint64_t dblink_id,
|
||||||
sqlclient::DblinkDriverProto dblink_type,
|
sqlclient::DblinkDriverProto dblink_type,
|
||||||
const sqlclient::dblink_param_ctx ¶m_ctx,
|
const sqlclient::dblink_param_ctx ¶m_ctx,
|
||||||
sqlclient::ObISQLConnection *&dblink_conn,
|
sqlclient::ObISQLConnection *&dblink_conn,
|
||||||
@ -186,6 +186,7 @@ public:
|
|||||||
int switch_dblink_conn_pool(sqlclient::DblinkDriverProto type, sqlclient::ObISQLConnectionPool *&dblink_conn_pool);
|
int switch_dblink_conn_pool(sqlclient::DblinkDriverProto type, sqlclient::ObISQLConnectionPool *&dblink_conn_pool);
|
||||||
int set_dblink_pool_charset(uint64_t dblink_id);
|
int set_dblink_pool_charset(uint64_t dblink_id);
|
||||||
inline sqlclient::ObDbLinkConnectionPool *get_dblink_conn_pool() { return link_pool_; }
|
inline sqlclient::ObDbLinkConnectionPool *get_dblink_conn_pool() { return link_pool_; }
|
||||||
|
int clean_dblink_connection(uint64_t tenant_id);
|
||||||
static int execute_init_sql(sqlclient::ObISQLConnection *dblink_conn, int link_type,
|
static int execute_init_sql(sqlclient::ObISQLConnection *dblink_conn, int link_type,
|
||||||
const char *set_sql_mode_cstr = NULL);
|
const char *set_sql_mode_cstr = NULL);
|
||||||
private:
|
private:
|
||||||
|
|||||||
@ -184,7 +184,7 @@ void ObServerConnectionPool::dump()
|
|||||||
"max_allowed_conn_count_", max_allowed_conn_count_, "server_not_available_", server_not_available_);
|
"max_allowed_conn_count_", max_allowed_conn_count_, "server_not_available_", server_not_available_);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ObServerConnectionPool::init_dblink(uint64_t dblink_id, const ObAddr &server,
|
int ObServerConnectionPool::init_dblink(uint64_t tenant_id, uint64_t dblink_id, const ObAddr &server,
|
||||||
const ObString &db_tenant, const ObString &db_user,
|
const ObString &db_tenant, const ObString &db_user,
|
||||||
const ObString &db_pass, const ObString &db_name,
|
const ObString &db_pass, const ObString &db_name,
|
||||||
const common::ObString &conn_str,
|
const common::ObString &conn_str,
|
||||||
@ -208,6 +208,7 @@ int ObServerConnectionPool::init_dblink(uint64_t dblink_id, const ObAddr &server
|
|||||||
K(dblink_id), K(db_tenant), K(db_user), K(db_pass), K(db_name));
|
K(dblink_id), K(db_tenant), K(db_user), K(db_pass), K(db_name));
|
||||||
} else {
|
} else {
|
||||||
dblink_id_ = dblink_id;
|
dblink_id_ = dblink_id;
|
||||||
|
tenant_id_ = tenant_id;
|
||||||
if (cluster_str.empty()) {
|
if (cluster_str.empty()) {
|
||||||
(void)snprintf(db_user_, sizeof(db_user_), "%.*s@%.*s", db_user.length(), db_user.ptr(),
|
(void)snprintf(db_user_, sizeof(db_user_), "%.*s@%.*s", db_user.length(), db_user.ptr(),
|
||||||
db_tenant.length(), db_tenant.ptr());
|
db_tenant.length(), db_tenant.ptr());
|
||||||
|
|||||||
@ -43,8 +43,7 @@ public:
|
|||||||
int64_t last_renew_time(void) const;
|
int64_t last_renew_time(void) const;
|
||||||
void reset_idle_conn_to_sys_tenant();
|
void reset_idle_conn_to_sys_tenant();
|
||||||
void set_server_gone(bool gone);
|
void set_server_gone(bool gone);
|
||||||
const char *get_db_user() const;
|
const char *get_db_user() const; const char *get_db_pass() const;
|
||||||
const char *get_db_pass() const;
|
|
||||||
const char *get_db_name() const;
|
const char *get_db_name() const;
|
||||||
common::ObAddr &get_server();
|
common::ObAddr &get_server();
|
||||||
ObMySQLConnectionPool *get_root();
|
ObMySQLConnectionPool *get_root();
|
||||||
@ -54,13 +53,15 @@ public:
|
|||||||
K_(free_conn_count),
|
K_(free_conn_count),
|
||||||
K_(busy_conn_count));
|
K_(busy_conn_count));
|
||||||
// dblink.
|
// dblink.
|
||||||
int init_dblink(uint64_t dblink_id, const ObAddr &server,
|
int init_dblink(uint64_t tenant_id, uint64_t dblink_id, const ObAddr &server,
|
||||||
const ObString &db_tenant, const ObString &db_user,
|
const ObString &db_tenant, const ObString &db_user,
|
||||||
const ObString &db_pass, const ObString &db_name,
|
const ObString &db_pass, const ObString &db_name,
|
||||||
const common::ObString &conn_str,
|
const common::ObString &conn_str,
|
||||||
const common::ObString &cluster_str,
|
const common::ObString &cluster_str,
|
||||||
ObMySQLConnectionPool *root, int64_t max_allowed_conn_count);
|
ObMySQLConnectionPool *root, int64_t max_allowed_conn_count);
|
||||||
uint64_t get_dblink_id() const;
|
uint64_t get_dblink_id() const;
|
||||||
|
uint64_t get_tenant_id() const { return tenant_id_; }
|
||||||
|
void set_tenant_id(uint64_t v) { tenant_id_ = v; }
|
||||||
int free_dblink_session(uint32_t sessid) override;
|
int free_dblink_session(uint32_t sessid) override;
|
||||||
private:
|
private:
|
||||||
ObSimpleConnectionAllocator<ObMySQLConnection> connection_pool_;
|
ObSimpleConnectionAllocator<ObMySQLConnection> connection_pool_;
|
||||||
@ -68,6 +69,7 @@ private:
|
|||||||
ObIConnectionAllocator<ObMySQLConnection> *connection_pool_ptr_;
|
ObIConnectionAllocator<ObMySQLConnection> *connection_pool_ptr_;
|
||||||
ObMySQLConnectionPool *root_;
|
ObMySQLConnectionPool *root_;
|
||||||
uint64_t dblink_id_;
|
uint64_t dblink_id_;
|
||||||
|
uint64_t tenant_id_;
|
||||||
char db_user_[OB_MAX_USER_NAME_LENGTH + OB_MAX_TENANT_NAME_LENGTH + OB_MAX_CLUSTER_NAME_LENGTH + 1];
|
char db_user_[OB_MAX_USER_NAME_LENGTH + OB_MAX_TENANT_NAME_LENGTH + OB_MAX_CLUSTER_NAME_LENGTH + 1];
|
||||||
char db_pass_[OB_MAX_PASSWORD_LENGTH];
|
char db_pass_[OB_MAX_PASSWORD_LENGTH];
|
||||||
char db_name_[OB_MAX_DATABASE_NAME_LENGTH];
|
char db_name_[OB_MAX_DATABASE_NAME_LENGTH];
|
||||||
|
|||||||
@ -92,14 +92,15 @@ public:
|
|||||||
const common::ObString &cluster_str,
|
const common::ObString &cluster_str,
|
||||||
const common::sqlclient::dblink_param_ctx ¶m_ctx) override
|
const common::sqlclient::dblink_param_ctx ¶m_ctx) override
|
||||||
{ UNUSEDx(tenant_id, dblink_id, server, db_tenant, db_user, db_pass, db_name, conn_str, param_ctx); return OB_SUCCESS; }
|
{ UNUSEDx(tenant_id, dblink_id, server, db_tenant, db_user, db_pass, db_name, conn_str, param_ctx); return OB_SUCCESS; }
|
||||||
virtual int acquire_dblink(uint64_t dblink_id, const sqlclient::dblink_param_ctx ¶m_ctx, common::sqlclient::ObISQLConnection *&dblink_conn, uint32_t sessid = 0, int64_t timeout_sec = 0)
|
virtual int acquire_dblink(uint64_t tenant_id, uint64_t dblink_id, const sqlclient::dblink_param_ctx ¶m_ctx, common::sqlclient::ObISQLConnection *&dblink_conn, uint32_t sessid = 0, int64_t timeout_sec = 0)
|
||||||
{ UNUSEDx(dblink_id, param_ctx, dblink_conn, sessid, timeout_sec); return OB_SUCCESS; }
|
{ UNUSEDx(tenant_id, dblink_id, param_ctx, dblink_conn, sessid, timeout_sec); return OB_SUCCESS; }
|
||||||
virtual int release_dblink(common::sqlclient::ObISQLConnection *dblink_conn, uint32_t sessid = 0)
|
virtual int release_dblink(common::sqlclient::ObISQLConnection *dblink_conn, uint32_t sessid = 0)
|
||||||
{ UNUSEDx(dblink_conn, sessid); return OB_SUCCESS; }
|
{ UNUSEDx(dblink_conn, sessid); return OB_SUCCESS; }
|
||||||
virtual int do_acquire_dblink(uint64_t dblink_id, const sqlclient::dblink_param_ctx ¶m_ctx, common::sqlclient::ObISQLConnection *&dblink_conn, uint32_t sessid = 0)
|
virtual int do_acquire_dblink(uint64_t tenant_id, uint64_t dblink_id, const sqlclient::dblink_param_ctx ¶m_ctx, common::sqlclient::ObISQLConnection *&dblink_conn, uint32_t sessid = 0)
|
||||||
{ UNUSEDx(dblink_id, param_ctx, dblink_conn, sessid); return OB_SUCCESS; }
|
{ UNUSEDx(tenant_id, dblink_id, param_ctx, dblink_conn, sessid); return OB_SUCCESS; }
|
||||||
virtual int try_connect_dblink(common::sqlclient::ObISQLConnection *dblink_conn, int64_t timeout_sec = 0) { UNUSEDx(dblink_conn, timeout_sec); return OB_SUCCESS; }
|
virtual int try_connect_dblink(common::sqlclient::ObISQLConnection *dblink_conn, int64_t timeout_sec = 0) { UNUSEDx(dblink_conn, timeout_sec); return OB_SUCCESS; }
|
||||||
|
virtual int clean_dblink_connection(uint64_t tenant_id)
|
||||||
|
{ UNUSED(tenant_id); return OB_SUCCESS; }
|
||||||
void dump_used_conn_list();
|
void dump_used_conn_list();
|
||||||
|
|
||||||
// Dozens of connections may acquired by one worker in oracle mode, because all sys tables
|
// Dozens of connections may acquired by one worker in oracle mode, because all sys tables
|
||||||
|
|||||||
@ -1554,7 +1554,11 @@ int ObMultiTenant::remove_tenant(const uint64_t tenant_id, bool &remove_tenant_s
|
|||||||
LOG_WARN("failed to erase_tenant_interm_result_info", K(ret), K(tenant_id));
|
LOG_WARN("failed to erase_tenant_interm_result_info", K(ret), K(tenant_id));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (OB_SUCC(ret) && OB_NOT_NULL(GCTX.dblink_proxy_)) {
|
||||||
|
if (OB_FAIL(GCTX.dblink_proxy_->clean_dblink_connection(tenant_id))) {
|
||||||
|
LOG_WARN("failed to clean dblink connection", K(ret), K(tenant_id));
|
||||||
|
}
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -8436,7 +8436,7 @@ int ObSchemaServiceSQLImpl::fetch_link_table_info(uint64_t tenant_id,
|
|||||||
}
|
}
|
||||||
} else if (!is_oracle_mode() && OB_FAIL(ObDblinkService::get_set_sql_mode_cstr(session_info, set_sql_mode_sql, alloctor))) {
|
} else if (!is_oracle_mode() && OB_FAIL(ObDblinkService::get_set_sql_mode_cstr(session_info, set_sql_mode_sql, alloctor))) {
|
||||||
LOG_WARN("failed to get set_sql_mode_sql", K(ret));
|
LOG_WARN("failed to get set_sql_mode_sql", K(ret));
|
||||||
} else if (OB_FAIL(dblink_proxy_->acquire_dblink(dblink_id, link_type, param_ctx,
|
} else if (OB_FAIL(dblink_proxy_->acquire_dblink(tenant_id, dblink_id, link_type, param_ctx,
|
||||||
dblink_conn,
|
dblink_conn,
|
||||||
session_info->get_sessid(),
|
session_info->get_sessid(),
|
||||||
next_sql_req_level,
|
next_sql_req_level,
|
||||||
|
|||||||
@ -149,7 +149,8 @@ int ObLinkOp::init_dblink(uint64_t dblink_id, ObDbLinkProxy *dblink_proxy, bool
|
|||||||
const char *set_sql_mode_sql = NULL;
|
const char *set_sql_mode_sql = NULL;
|
||||||
if (!is_oracle_mode() && OB_FAIL(ObDblinkService::get_set_sql_mode_cstr(my_session, set_sql_mode_sql, allocator_))) {
|
if (!is_oracle_mode() && OB_FAIL(ObDblinkService::get_set_sql_mode_cstr(my_session, set_sql_mode_sql, allocator_))) {
|
||||||
LOG_WARN("failed to get set_sql_mode_sql", K(ret));
|
LOG_WARN("failed to get set_sql_mode_sql", K(ret));
|
||||||
} else if (OB_FAIL(dblink_proxy->acquire_dblink(dblink_id,
|
} else if (OB_FAIL(dblink_proxy->acquire_dblink(tenant_id_,
|
||||||
|
dblink_id,
|
||||||
link_type_,
|
link_type_,
|
||||||
param_ctx,
|
param_ctx,
|
||||||
dblink_conn_,
|
dblink_conn_,
|
||||||
|
|||||||
Reference in New Issue
Block a user