diff --git a/src/sql/engine/table/ob_link_scan_op.cpp b/src/sql/engine/table/ob_link_scan_op.cpp index 2ecd50238d..bedf24548c 100644 --- a/src/sql/engine/table/ob_link_scan_op.cpp +++ b/src/sql/engine/table/ob_link_scan_op.cpp @@ -135,6 +135,11 @@ int ObLinkScanOp::free_snapshot() if (OB_FAIL(ob_oci_conn->free_snapshot(snapshot_created_))) { LOG_WARN("set conn snapshot failed", K(ret)); } + int tmp_ret = OB_SUCCESS; + hash::ObHashMap &dblink_snapshot_map = ctx_.get_dblink_snapshot_map(); + if (OB_SUCCESS != (tmp_ret = dblink_snapshot_map.erase_refactored(dblink_id_))) { + LOG_WARN("failed to erase snapshot in map", K(tmp_ret), K(dblink_id_), K(ret)); + } snapshot_created_ = NULL; } return ret; @@ -212,23 +217,33 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt) return ret; } -void ObLinkScanOp::reset_dblink() +void ObLinkScanOp::reset_oci_connection()// once read oracle, once reset oci connection { int tmp_ret = OB_SUCCESS; #ifdef OB_BUILD_DBLINK if (DBLINK_DRV_OCI == link_type_ && - NULL != dblink_conn_) { - if (OB_SUCCESS != (tmp_ret = static_cast(dblink_conn_)->free_oci_stmt())) { - LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret)); - } - if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = free_snapshot()))) { - LOG_WARN_RET(tmp_ret, "free dblink snapshot failed"); - } + NULL != dblink_conn_ && + OB_SUCCESS != (tmp_ret = static_cast(dblink_conn_)->free_oci_stmt())) { + LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret)); } else if (NULL != tm_rm_connection_ && DblinkDriverProto::DBLINK_DRV_OCI == tm_rm_connection_->get_dblink_driver_proto() && OB_SUCCESS != (tmp_ret = static_cast(tm_rm_connection_)->free_oci_stmt())) { LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret)); } +#endif +} + +void ObLinkScanOp::reset_dblink() +{ + int tmp_ret = OB_SUCCESS; +#ifdef OB_BUILD_DBLINK + reset_oci_connection(); + // free oci snapshot when operator close + if (DBLINK_DRV_OCI == link_type_ && + NULL != dblink_conn_ && + OB_UNLIKELY(OB_SUCCESS != (tmp_ret = free_snapshot()))) { + LOG_WARN_RET(tmp_ret, "free dblink snapshot failed"); + } #endif if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_) && !in_xa_trascaction_ && OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type_, dblink_conn_))) { @@ -398,6 +413,7 @@ int ObLinkScanOp::fetch_row() LOG_WARN("failed to get next row", K(ret)); } else { reset_result(); + reset_oci_connection(); } } else { const ObIArray &select_exprs = @@ -464,18 +480,11 @@ int ObLinkScanOp::inner_close() int ObLinkScanOp::inner_rescan() { reset_result(); + reset_oci_connection(); reset_link_sql(); iter_end_ = false; iterated_rows_ = -1; int tmp_ret = OB_SUCCESS; -#ifdef OB_BUILD_DBLINK - if (DBLINK_DRV_OCI == link_type_ && - NULL != dblink_conn_) { - if (OB_SUCCESS != (tmp_ret = static_cast(dblink_conn_)->free_oci_stmt())) { - LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret)); - } - } -#endif return ObOperator::inner_rescan(); } diff --git a/src/sql/engine/table/ob_link_scan_op.h b/src/sql/engine/table/ob_link_scan_op.h index 42f636937e..7d9a7a4444 100644 --- a/src/sql/engine/table/ob_link_scan_op.h +++ b/src/sql/engine/table/ob_link_scan_op.h @@ -52,6 +52,7 @@ public: private: virtual void reset_dblink() override; void reset_result(); + void reset_oci_connection(); int init_conn_snapshot(bool &new_snapshot); int free_snapshot(); bool need_tx(const ObSQLSessionInfo *my_session) const;