[CP] implement dblink reuse connection cfg

This commit is contained in:
obdev
2024-02-09 22:59:11 +00:00
committed by ob-robot
parent f6681accc1
commit a34e69a914
7 changed files with 45 additions and 14 deletions

View File

@ -376,6 +376,7 @@ int ObLruConnectionAllocator<T>::free_session_conn_array(uint32_t sessid, int64_
_OB_LOG(DEBUG, "try to free conn_array, conn_array=%p, count=%ld", conn_array, array_size);
while (OB_SUCC(ret) && OB_SUCCESS == conn_array->pop_back(conn)) {
conn->close(); //close immedately
_OB_LOG(TRACE, "close connection, conn=%p", conn);
if (OB_FAIL(free_conn_array_.push_back(conn))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
_OB_LOG(WARN, "push_back conn to free_conn_array_ failed, sessid=%u, ret=%d", sessid, ret);
@ -390,6 +391,7 @@ int ObLruConnectionAllocator<T>::free_session_conn_array(uint32_t sessid, int64_
fail_recycled_conn_count = array_size - succ_count;
while (OB_SUCCESS == conn_array->pop_back(conn)) {
conn->close(); //close immedately
_OB_LOG(TRACE, "close connection, conn=%p", conn);
ObLruConnectionAllocator<T>::free(conn);
}
if (OB_SUCCESS != sessionid_to_conns_map_.erase_refactored(sessid)) {
@ -465,9 +467,11 @@ int ObLruConnectionAllocator<T>::get_cached(T *&conn, uint32_t sessid)
} else if (OB_FAIL(conn_array_erase(conn_array, conn))) {
ObLruConnectionAllocator<T>::free(conn);
_OB_LOG(WARN, "failed to erase conn from conn_array, other_sessid=%u, sessid=%u, ret=%d", other_sessid, sessid, ret);
} else if (!is_session_share_conns_) { // get conn from other seesion, need close it.
conn->close();
_OB_LOG(TRACE, "get conn from other seesion, need close it, other_sessid=%u, sessid=%u, ret=%d", other_sessid, sessid, ret);
} else if (!is_session_share_conns_) {
conn->set_sessid(0);
// get conn from other seesion, set sessid as 0,
// and will close it at ObServerConnectionPool::acquire() and int ObOciServerConnectionPool::acquire(ObOciConnection *&conn, uint32_t sessid).
_OB_LOG(TRACE, "get conn from other seesion, other_sessid=%u, sessid=%u, ret=%d", other_sessid, sessid, ret);
}
} else if (NULL == conn) {
ret = OB_ERR_UNEXPECTED;

View File

@ -34,6 +34,10 @@ const char* __attribute__((weak)) get_mysql_str_error(int index)
return NULL;
}
bool __attribute__((weak)) get_dblink_reuse_connection_cfg()
{
return true;
}
namespace oceanbase
{

View File

@ -15,6 +15,8 @@
#include "lib/utility/ob_edit_distance.h"
#include "lib/ob_errno.h"
extern bool get_dblink_reuse_connection_cfg();
namespace oceanbase
{
namespace common

View File

@ -88,17 +88,20 @@ int ObServerConnectionPool::acquire(ObMySQLConnection *&conn, uint32_t sessid)
}
if (OB_SUCC(ret)) {
conn = connection;
conn->set_sessid(sessid);
if (conn->connection_version() != connection_version_) {
conn->set_connection_version(connection_version_);
conn->close();
} else if (sessid != conn->get_sessid()) {
LOG_TRACE("get connection from other session, close it", K(ret), K(sessid), K(conn->get_sessid()));
conn->close();
} else if (false == conn->is_closed()) {
if (OB_SUCCESS != conn->ping()) {
conn->close();
}
}
conn->set_sessid(sessid);
}
LOG_TRACE("acquire connection from server conn pool", KP(this), K(busy_conn_count_), K(free_conn_count_), KP(connection), K(ret), K(lbt()));
LOG_TRACE("acquire connection from server conn pool", KP(this), K(busy_conn_count_), K(free_conn_count_), KP(connection), K(ret), K(sessid), K(lbt()));
return ret;
}
@ -111,14 +114,20 @@ int ObServerConnectionPool::release(common::sqlclient::ObISQLConnection *conn, c
LOG_WARN("invalid connection", K(connection), K(ret));
} else {
connection->set_busy(false);
ObSpinLockGuard lock(pool_lock_);
if (succ) {
connection->succ_times_++;
if (!get_dblink_reuse_connection_cfg()) {
connection->close();
LOG_TRACE("close dblink connection when release it", K(ret), K(succ), KP(conn));
}
} else {
LOG_TRACE("release! err", K(succ));
LOG_TRACE("release oci connection, close it caused by err", K(succ));
connection->error_times_++;
connection->close();
}
{
// Do not perform any operations on the network after thread holding a lock
ObSpinLockGuard lock(pool_lock_);
if (OB_FAIL(connection_pool_ptr_->put_cached(connection, connection->get_sessid()))) {
ATOMIC_DEC(&busy_conn_count_);
LOG_WARN("connection object failed to put to cache. destroyed", K(ret));
@ -127,6 +136,7 @@ int ObServerConnectionPool::release(common::sqlclient::ObISQLConnection *conn, c
ATOMIC_INC(&free_conn_count_);
}
}
}
LOG_TRACE("release connection to server conn pool", KP(this),
K(busy_conn_count_),
K(free_conn_count_),

View File

@ -1679,7 +1679,9 @@ DEF_STR_WITH_CHECKER(rpc_server_authentication_method, OB_CLUSTER_PARAMETER, "AL
DEF_BOOL(_enable_backtrace_function, OB_CLUSTER_PARAMETER, "True",
"Decide whether to let the backtrace function take effect",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_BOOL(_enable_dblink_reuse_connection, OB_TENANT_PARAMETER, "True",
"specifies whether dblink reuse connection in a session",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_with_subquery, OB_TENANT_PARAMETER, "0", "[0,2]",
"WITH subquery transformation,0: optimizer,1: materialize,2: inline",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));

View File

@ -30,6 +30,14 @@ using namespace oceanbase::sql;
using namespace oceanbase::share;
#ifdef OB_BUILD_DBLINK
bool get_dblink_reuse_connection_cfg()
{
omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
// default value of _enable_dblink_reuse_connection is true, if !tenant_config.is_valid() return true
return tenant_config.is_valid() ? tenant_config->_enable_dblink_reuse_connection : true;
}
uint64_t ObDblinkService::get_current_tenant_id()
{
return MTL_ID();

View File

@ -279,6 +279,7 @@ _enable_block_file_punch_hole
_enable_column_store
_enable_compaction_diagnose
_enable_convert_real_to_decimal
_enable_dblink_reuse_connection
_enable_decimal_int_type
_enable_defensive_check
_enable_easy_keepalive