修复dblink sequence长、短连接表现不同的问题

This commit is contained in:
zzg19950727
2024-03-05 13:15:09 +00:00
committed by ob-robot
parent 5a32671bdb
commit 5f17ceeab2
2 changed files with 31 additions and 25 deletions

View File

@ -207,7 +207,6 @@ int ObRemoteSequenceExecutor::init_dblink_connection(ObExecContext &ctx)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo * my_session = ctx.get_my_session();
common::sqlclient::ObISQLConnection *dblink_conn = NULL;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx);
ObDbLinkProxy *dblink_proxy = GCTX.dblink_proxy_;
const ObDbLinkSchema *dblink_schema = NULL;
@ -248,15 +247,22 @@ int ObRemoteSequenceExecutor::init_dblink_connection(ObExecContext &ctx)
dblink_schema->get_conn_string(),
dblink_schema->get_cluster_name()))) {
LOG_WARN("failed to create dblink pool", K(ret));
} else if (OB_FAIL(ObDblinkService::get_local_session_vars(my_session, ctx.get_allocator(), param_ctx))) {
} else if (OB_FAIL(my_session->get_dblink_context().get_dblink_conn(dblink_id_, dblink_conn_))) {
LOG_WARN("failed to get dblink connection from session", K(my_session), K(sessid_), K(ret));
} else if (NULL == dblink_conn_) {
if (OB_FAIL(ObDblinkService::get_local_session_vars(my_session, ctx.get_allocator(), param_ctx))) {
LOG_WARN("failed to get local session vars", K(ret));
} else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx,
dblink_conn_))) {
LOG_WARN("failed to acquire dblink", K(ret), K(dblink_id_));
} else if (OB_FAIL(my_session->get_dblink_context().register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, dblink_conn_))) {
LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx));
} else if (OB_FAIL(my_session->get_dblink_context().register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (OB_FAIL(my_session->get_dblink_context().set_dblink_conn(dblink_conn_))) {
LOG_WARN("failed to set dblink connection to session", K(my_session), K(sessid_), K(ret));
} else {
LOG_TRACE("link op get connection from dblink pool", KP(dblink_conn_), K(lbt()));
}
} else {
LOG_TRACE("link op get connection from dblink pool", KP(dblink_conn_), K(lbt()));
LOG_TRACE("link op get connection from xa transaction", KP(dblink_conn_));
}
return ret;
}
@ -303,18 +309,14 @@ void ObRemoteSequenceExecutor::reset()
void ObRemoteSequenceExecutor::destroy()
{
int ret = OB_SUCCESS;
#ifdef OB_BUILD_DBLINK
#ifdef OB_BUILD_DBLINK
if (DBLINK_DRV_OCI == link_type_ &&
NULL != dblink_conn_ &&
OB_FAIL(static_cast<ObOciConnection *>(dblink_conn_)->free_oci_stmt())) {
LOG_WARN("failed to close oci result", K(ret));
}
#endif
if (OB_NOT_NULL(GCTX.dblink_proxy_) &&
OB_NOT_NULL(dblink_conn_) &&
OB_FAIL(GCTX.dblink_proxy_->release_dblink(link_type_, dblink_conn_))) {
LOG_WARN("failed to release connection", K(ret));
}
//release dblink connection by session
sessid_ = 0;
dblink_conn_ = NULL;
ObSequenceExecutor::destroy();

View File

@ -305,11 +305,20 @@ int ObSequenceNamespaceChecker::check_link_sequence_exists(const ObDbLinkSchema
dblink_schema->get_conn_string(),
dblink_schema->get_cluster_name()))) {
LOG_WARN("create dblink pool failed", K(ret), K(param_ctx));
} else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx,
dblink_conn))) {
LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx));
} else if (OB_FAIL(session_info->get_dblink_context().register_dblink_conn_pool(dblink_conn->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (OB_FAIL(session_info->get_dblink_context().get_dblink_conn(dblink_id, dblink_conn))) {
LOG_WARN("failed to get dblink connection from session", K(ret));
} else if (NULL == dblink_conn) {
if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, dblink_conn))) {
LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx));
} else if (OB_FAIL(session_info->get_dblink_context().register_dblink_conn_pool(dblink_conn->get_common_server_pool()))) {
LOG_WARN("failed to register dblink conn pool to current session", K(ret));
} else if (OB_FAIL(session_info->get_dblink_context().set_dblink_conn(dblink_conn))) {
LOG_WARN("failed to set dblink connection to session", K(ret));
} else {
LOG_TRACE("link sequence get connection from dblink pool", K(lbt()));
}
}
if (OB_FAIL(ret)) {
} else if (OB_FAIL(dblink_proxy->dblink_read(dblink_conn, res, sql.ptr()))) {
if (OB_ERR_SEQ_NOT_EXIST != ret) {
exists = true;
@ -345,12 +354,7 @@ int ObSequenceNamespaceChecker::check_link_sequence_exists(const ObDbLinkSchema
LOG_WARN("failed to close oci result", K(tmp_ret));
}
#endif
if (OB_SUCCESS != (tmp_ret = dblink_proxy->release_dblink(param_ctx.link_type_, dblink_conn))) {
LOG_WARN("failed to relese connection", K(tmp_ret));
}
if (OB_SUCC(ret)) {
ret = tmp_ret;
}
//release dblink connection by session
}
}
}