diff --git a/src/sql/engine/sequence/ob_sequence_op.cpp b/src/sql/engine/sequence/ob_sequence_op.cpp index 78f21baa67..0c601e190c 100644 --- a/src/sql/engine/sequence/ob_sequence_op.cpp +++ b/src/sql/engine/sequence/ob_sequence_op.cpp @@ -207,7 +207,6 @@ int ObRemoteSequenceExecutor::init_dblink_connection(ObExecContext &ctx) { int ret = OB_SUCCESS; ObSQLSessionInfo * my_session = ctx.get_my_session(); - common::sqlclient::ObISQLConnection *dblink_conn = NULL; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx); ObDbLinkProxy *dblink_proxy = GCTX.dblink_proxy_; const ObDbLinkSchema *dblink_schema = NULL; @@ -248,15 +247,22 @@ int ObRemoteSequenceExecutor::init_dblink_connection(ObExecContext &ctx) dblink_schema->get_conn_string(), dblink_schema->get_cluster_name()))) { LOG_WARN("failed to create dblink pool", K(ret)); - } else if (OB_FAIL(ObDblinkService::get_local_session_vars(my_session, ctx.get_allocator(), param_ctx))) { + } else if (OB_FAIL(my_session->get_dblink_context().get_dblink_conn(dblink_id_, dblink_conn_))) { + LOG_WARN("failed to get dblink connection from session", K(my_session), K(sessid_), K(ret)); + } else if (NULL == dblink_conn_) { + if (OB_FAIL(ObDblinkService::get_local_session_vars(my_session, ctx.get_allocator(), param_ctx))) { LOG_WARN("failed to get local session vars", K(ret)); - } else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, - dblink_conn_))) { - LOG_WARN("failed to acquire dblink", K(ret), K(dblink_id_)); - } else if (OB_FAIL(my_session->get_dblink_context().register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) { - LOG_WARN("failed to register dblink conn pool to current session", K(ret)); + } else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, dblink_conn_))) { + LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx)); + } else if (OB_FAIL(my_session->get_dblink_context().register_dblink_conn_pool(dblink_conn_->get_common_server_pool()))) { + LOG_WARN("failed to register dblink conn pool to current session", K(ret)); + } else if (OB_FAIL(my_session->get_dblink_context().set_dblink_conn(dblink_conn_))) { + LOG_WARN("failed to set dblink connection to session", K(my_session), K(sessid_), K(ret)); + } else { + LOG_TRACE("link op get connection from dblink pool", KP(dblink_conn_), K(lbt())); + } } else { - LOG_TRACE("link op get connection from dblink pool", KP(dblink_conn_), K(lbt())); + LOG_TRACE("link op get connection from xa transaction", KP(dblink_conn_)); } return ret; } @@ -303,18 +309,14 @@ void ObRemoteSequenceExecutor::reset() void ObRemoteSequenceExecutor::destroy() { int ret = OB_SUCCESS; -#ifdef OB_BUILD_DBLINK + #ifdef OB_BUILD_DBLINK if (DBLINK_DRV_OCI == link_type_ && NULL != dblink_conn_ && OB_FAIL(static_cast(dblink_conn_)->free_oci_stmt())) { LOG_WARN("failed to close oci result", K(ret)); } #endif - if (OB_NOT_NULL(GCTX.dblink_proxy_) && - OB_NOT_NULL(dblink_conn_) && - OB_FAIL(GCTX.dblink_proxy_->release_dblink(link_type_, dblink_conn_))) { - LOG_WARN("failed to release connection", K(ret)); - } + //release dblink connection by session sessid_ = 0; dblink_conn_ = NULL; ObSequenceExecutor::destroy(); diff --git a/src/sql/resolver/dml/ob_sequence_namespace_checker.cpp b/src/sql/resolver/dml/ob_sequence_namespace_checker.cpp index 3dadea7dc3..879b350905 100644 --- a/src/sql/resolver/dml/ob_sequence_namespace_checker.cpp +++ b/src/sql/resolver/dml/ob_sequence_namespace_checker.cpp @@ -305,11 +305,20 @@ int ObSequenceNamespaceChecker::check_link_sequence_exists(const ObDbLinkSchema dblink_schema->get_conn_string(), dblink_schema->get_cluster_name()))) { LOG_WARN("create dblink pool failed", K(ret), K(param_ctx)); - } else if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, - dblink_conn))) { - LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx)); - } else if (OB_FAIL(session_info->get_dblink_context().register_dblink_conn_pool(dblink_conn->get_common_server_pool()))) { - LOG_WARN("failed to register dblink conn pool to current session", K(ret)); + } else if (OB_FAIL(session_info->get_dblink_context().get_dblink_conn(dblink_id, dblink_conn))) { + LOG_WARN("failed to get dblink connection from session", K(ret)); + } else if (NULL == dblink_conn) { + if (OB_FAIL(dblink_proxy->acquire_dblink(param_ctx, dblink_conn))) { + LOG_WARN("failed to acquire dblink", K(ret), K(param_ctx)); + } else if (OB_FAIL(session_info->get_dblink_context().register_dblink_conn_pool(dblink_conn->get_common_server_pool()))) { + LOG_WARN("failed to register dblink conn pool to current session", K(ret)); + } else if (OB_FAIL(session_info->get_dblink_context().set_dblink_conn(dblink_conn))) { + LOG_WARN("failed to set dblink connection to session", K(ret)); + } else { + LOG_TRACE("link sequence get connection from dblink pool", K(lbt())); + } + } + if (OB_FAIL(ret)) { } else if (OB_FAIL(dblink_proxy->dblink_read(dblink_conn, res, sql.ptr()))) { if (OB_ERR_SEQ_NOT_EXIST != ret) { exists = true; @@ -345,12 +354,7 @@ int ObSequenceNamespaceChecker::check_link_sequence_exists(const ObDbLinkSchema LOG_WARN("failed to close oci result", K(tmp_ret)); } #endif - if (OB_SUCCESS != (tmp_ret = dblink_proxy->release_dblink(param_ctx.link_type_, dblink_conn))) { - LOG_WARN("failed to relese connection", K(tmp_ret)); - } - if (OB_SUCC(ret)) { - ret = tmp_ret; - } + //release dblink connection by session } } }