From a748a6fa93a730eb4105defeaba345c9dca5be53 Mon Sep 17 00:00:00 2001 From: cqliang1995 Date: Tue, 23 Jan 2024 06:43:00 +0000 Subject: [PATCH] [CP] implement dblink reuse connection cfg --- .../lib/mysqlclient/ob_connection_allocator.h | 10 +++++-- .../lib/mysqlclient/ob_dblink_error_trans.cpp | 4 +++ .../lib/mysqlclient/ob_dblink_error_trans.h | 2 ++ .../mysqlclient/ob_server_connection_pool.cpp | 30 ++++++++++++------- src/share/parameter/ob_parameter_seed.ipp | 4 ++- src/sql/dblink/ob_dblink_utils.cpp | 8 +++++ .../all_virtual_sys_parameter_stat.result | 1 + 7 files changed, 45 insertions(+), 14 deletions(-) diff --git a/deps/oblib/src/lib/mysqlclient/ob_connection_allocator.h b/deps/oblib/src/lib/mysqlclient/ob_connection_allocator.h index 9595b30a74..51177d92ff 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_connection_allocator.h +++ b/deps/oblib/src/lib/mysqlclient/ob_connection_allocator.h @@ -376,6 +376,7 @@ int ObLruConnectionAllocator::free_session_conn_array(uint32_t sessid, int64_ _OB_LOG(DEBUG, "try to free conn_array, conn_array=%p, count=%ld", conn_array, array_size); while (OB_SUCC(ret) && OB_SUCCESS == conn_array->pop_back(conn)) { conn->close(); //close immedately + _OB_LOG(TRACE, "close connection, conn=%p", conn); if (OB_FAIL(free_conn_array_.push_back(conn))) { ret = OB_ALLOCATE_MEMORY_FAILED; _OB_LOG(WARN, "push_back conn to free_conn_array_ failed, sessid=%u, ret=%d", sessid, ret); @@ -390,6 +391,7 @@ int ObLruConnectionAllocator::free_session_conn_array(uint32_t sessid, int64_ fail_recycled_conn_count = array_size - succ_count; while (OB_SUCCESS == conn_array->pop_back(conn)) { conn->close(); //close immedately + _OB_LOG(TRACE, "close connection, conn=%p", conn); ObLruConnectionAllocator::free(conn); } if (OB_SUCCESS != sessionid_to_conns_map_.erase_refactored(sessid)) { @@ -465,9 +467,11 @@ int ObLruConnectionAllocator::get_cached(T *&conn, uint32_t sessid) } else if (OB_FAIL(conn_array_erase(conn_array, conn))) { ObLruConnectionAllocator::free(conn); _OB_LOG(WARN, "failed to erase conn from conn_array, other_sessid=%u, sessid=%u, ret=%d", other_sessid, sessid, ret); - } else if (!is_session_share_conns_) { // get conn from other seesion, need close it. - conn->close(); - _OB_LOG(TRACE, "get conn from other seesion, need close it, other_sessid=%u, sessid=%u, ret=%d", other_sessid, sessid, ret); + } else if (!is_session_share_conns_) { + conn->set_sessid(0); + // get conn from other seesion, set sessid as 0, + // and will close it at ObServerConnectionPool::acquire() and int ObOciServerConnectionPool::acquire(ObOciConnection *&conn, uint32_t sessid). + _OB_LOG(TRACE, "get conn from other seesion, other_sessid=%u, sessid=%u, ret=%d", other_sessid, sessid, ret); } } else if (NULL == conn) { ret = OB_ERR_UNEXPECTED; diff --git a/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.cpp b/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.cpp index 05886b28c8..5d43044113 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.cpp @@ -34,6 +34,10 @@ const char* __attribute__((weak)) get_mysql_str_error(int index) return NULL; } +bool __attribute__((weak)) get_dblink_reuse_connection_cfg() +{ + return true; +} namespace oceanbase { diff --git a/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.h b/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.h index c1eeb5f930..f1d2eb50b2 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.h +++ b/deps/oblib/src/lib/mysqlclient/ob_dblink_error_trans.h @@ -15,6 +15,8 @@ #include "lib/utility/ob_edit_distance.h" #include "lib/ob_errno.h" +extern bool get_dblink_reuse_connection_cfg(); + namespace oceanbase { namespace common diff --git a/deps/oblib/src/lib/mysqlclient/ob_server_connection_pool.cpp b/deps/oblib/src/lib/mysqlclient/ob_server_connection_pool.cpp index fbc045e92a..10972389d0 100644 --- a/deps/oblib/src/lib/mysqlclient/ob_server_connection_pool.cpp +++ b/deps/oblib/src/lib/mysqlclient/ob_server_connection_pool.cpp @@ -88,17 +88,20 @@ int ObServerConnectionPool::acquire(ObMySQLConnection *&conn, uint32_t sessid) } if (OB_SUCC(ret)) { conn = connection; - conn->set_sessid(sessid); if (conn->connection_version() != connection_version_) { conn->set_connection_version(connection_version_); conn->close(); + } else if (sessid != conn->get_sessid()) { + LOG_TRACE("get connection from other session, close it", K(ret), K(sessid), K(conn->get_sessid())); + conn->close(); } else if (false == conn->is_closed()) { if (OB_SUCCESS != conn->ping()) { conn->close(); } } + conn->set_sessid(sessid); } - LOG_TRACE("acquire connection from server conn pool", KP(this), K(busy_conn_count_), K(free_conn_count_), KP(connection), K(ret), K(lbt())); + LOG_TRACE("acquire connection from server conn pool", KP(this), K(busy_conn_count_), K(free_conn_count_), KP(connection), K(ret), K(sessid), K(lbt())); return ret; } @@ -111,20 +114,27 @@ int ObServerConnectionPool::release(common::sqlclient::ObISQLConnection *conn, c LOG_WARN("invalid connection", K(connection), K(ret)); } else { connection->set_busy(false); - ObSpinLockGuard lock(pool_lock_); if (succ) { connection->succ_times_++; + if (!get_dblink_reuse_connection_cfg()) { + connection->close(); + LOG_TRACE("close dblink connection when release it", K(ret), K(succ), KP(conn)); + } } else { - LOG_TRACE("release! err", K(succ)); + LOG_TRACE("release oci connection, close it caused by err", K(succ)); connection->error_times_++; connection->close(); } - if (OB_FAIL(connection_pool_ptr_->put_cached(connection, connection->get_sessid()))) { - ATOMIC_DEC(&busy_conn_count_); - LOG_WARN("connection object failed to put to cache. destroyed", K(ret)); - } else { - ATOMIC_DEC(&busy_conn_count_); - ATOMIC_INC(&free_conn_count_); + { + // Do not perform any operations on the network after thread holding a lock + ObSpinLockGuard lock(pool_lock_); + if (OB_FAIL(connection_pool_ptr_->put_cached(connection, connection->get_sessid()))) { + ATOMIC_DEC(&busy_conn_count_); + LOG_WARN("connection object failed to put to cache. destroyed", K(ret)); + } else { + ATOMIC_DEC(&busy_conn_count_); + ATOMIC_INC(&free_conn_count_); + } } } LOG_TRACE("release connection to server conn pool", KP(this), diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index 9d2e3bac60..159b98a8f7 100644 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1679,7 +1679,9 @@ DEF_STR_WITH_CHECKER(rpc_server_authentication_method, OB_CLUSTER_PARAMETER, "AL DEF_BOOL(_enable_backtrace_function, OB_CLUSTER_PARAMETER, "True", "Decide whether to let the backtrace function take effect", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); - +DEF_BOOL(_enable_dblink_reuse_connection, OB_TENANT_PARAMETER, "True", + "specifies whether dblink reuse connection in a session", + ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_INT(_with_subquery, OB_TENANT_PARAMETER, "0", "[0,2]", "WITH subquery transformation,0: optimizer,1: materialize,2: inline", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); diff --git a/src/sql/dblink/ob_dblink_utils.cpp b/src/sql/dblink/ob_dblink_utils.cpp index e43ee65c37..3478271fe6 100644 --- a/src/sql/dblink/ob_dblink_utils.cpp +++ b/src/sql/dblink/ob_dblink_utils.cpp @@ -30,6 +30,14 @@ using namespace oceanbase::sql; using namespace oceanbase::share; #ifdef OB_BUILD_DBLINK + +bool get_dblink_reuse_connection_cfg() +{ + omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID())); + // default value of _enable_dblink_reuse_connection is true, if !tenant_config.is_valid() return true + return tenant_config.is_valid() ? tenant_config->_enable_dblink_reuse_connection : true; +} + uint64_t ObDblinkService::get_current_tenant_id() { return MTL_ID(); diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index 53c2d52f4e..20b94f126f 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -279,6 +279,7 @@ _enable_block_file_punch_hole _enable_column_store _enable_compaction_diagnose _enable_convert_real_to_decimal +_enable_dblink_reuse_connection _enable_decimal_int_type _enable_defensive_check _enable_easy_keepalive