[FEAT MERGE] 开源裁减颗粒度优化

Co-authored-by: nroskill <nroskill@gmail.com>
Co-authored-by: akaError <lzg020616@163.com>
Co-authored-by: yinyj17 <yinyijun92@gmail.com>
This commit is contained in:
wenxingsen
2023-08-15 02:40:25 +00:00
committed by ob-robot
parent a31e422133
commit 9b31f8aa03
378 changed files with 26718 additions and 2379 deletions

View File

@ -24,6 +24,9 @@
#include "lib/string/ob_hex_utils_base.h"
#include "sql/session/ob_sql_session_mgr.h"
#include "sql/engine/expr/ob_expr_lob_utils.h"
#ifdef OB_BUILD_DBLINK
#include "lib/oracleclient/ob_oci_connection.h"
#endif
namespace oceanbase
{
using namespace common;
@ -76,8 +79,67 @@ int ObLinkScanOp::init_tz_info(const ObTimeZoneInfo *tz_info)
}
return ret;
}
#ifdef OB_BUILD_DBLINK
int ObLinkScanOp::init_conn_snapshot(bool &new_snapshot)
{
int ret = OB_SUCCESS;
new_snapshot = false;
void *snapshot = NULL;
bool snapshot_inited = false;
if (NULL == dblink_conn_ || DBLINK_DRV_OCI != link_type_) {
// do nothing.
} else {
hash::ObHashMap<uint64_t, void*> &dblink_snapshot_map = ctx_.get_dblink_snapshot_map();
ObOciConnection *ob_oci_conn = static_cast<ObOciConnection*>(dblink_conn_);
if (!dblink_snapshot_map.created() && OB_FAIL(dblink_snapshot_map.create(8, "dblinksnapshot"))) {
LOG_WARN("create hash map failed", K(ret));
} else if (OB_FAIL(dblink_snapshot_map.get_refactored(dblink_id_, snapshot))) {
if (OB_HASH_NOT_EXIST != ret) {
LOG_WARN("get dblink snapshot failed", K(ret));
} else if (OB_FAIL(ob_oci_conn->create_snapshot(snapshot))) {
LOG_WARN("init snapshot failed", K(ret));
} else if (OB_ISNULL(snapshot)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("create null snapshot", K(ret), K(get_spec().get_id()), K(link_type_));
} else {
snapshot_created_ = snapshot;
new_snapshot = true;
snapshot_inited = false;
LOG_TRACE("init conn snapshot, create one", K(get_spec().get_id()), K(ob_oci_conn),
K(ob_oci_conn->get_oci_connection()), K(snapshot));
}
} else {
snapshot_inited = true;
LOG_TRACE("init conn snapshot, fetch one", K(get_spec().get_id()), K(ob_oci_conn),
K(ob_oci_conn->get_oci_connection()), K(snapshot));
}
if (OB_SUCC(ret) && OB_FAIL(ob_oci_conn->set_conn_snapshot(snapshot, snapshot_inited))) {
LOG_ERROR("set conn snapshot failed", K(ret));
}
}
return ret;
}
int ObLinkScanOp::free_snapshot()
{
int ret = OB_SUCCESS;
if (NULL == snapshot_created_) {
// do nothing.
} else if (OB_UNLIKELY(NULL == dblink_conn_ || DBLINK_DRV_OCI != link_type_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("dblink conn is null or unexpected dblink type", K(ret), K(dblink_conn_), K(link_type_));
} else {
ObOciConnection *ob_oci_conn = static_cast<ObOciConnection*>(dblink_conn_);
void *snapshot = NULL;
bool snapshot_inited = false;
if (OB_FAIL(ob_oci_conn->free_snapshot(snapshot_created_))) {
LOG_WARN("set conn snapshot failed", K(ret));
}
snapshot_created_ = NULL;
}
return ret;
}
#endif
int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
{
int ret = OB_SUCCESS;
@ -107,9 +169,26 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
} else if (OB_ISNULL(dblink_proxy_) || OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL", K(ret), KP(dblink_proxy_), KP(my_session));
} else if (need_tx(my_session) && OB_FAIL(ObTMService::tm_rm_start(ctx_, link_type_, dblink_conn_, tx_id))) {
LOG_WARN("failed to tm_rm_start", K(ret), K(dblink_id_), K(dblink_conn_), K(sessid_));
#ifdef OB_BUILD_DBLINK
} else if (OB_FAIL(init_conn_snapshot(new_snapshot))) {
LOG_WARN("init conn snapshot failed", K(ret));
#endif
} else {
if (OB_FAIL(dblink_proxy_->dblink_read(dblink_conn_, res_, link_stmt))) {
LOG_WARN("read failed", K(ret), K(link_type_), K(dblink_conn_), K(link_stmt));
#ifdef OB_BUILD_DBLINK
} else if (new_snapshot && OB_FAIL(ctx_.get_dblink_snapshot_map().set_refactored(dblink_id_, snapshot_created_))) {
LOG_WARN("set snapshot failed", K(ret));
}
if (OB_FAIL(ret) && new_snapshot) {
int tmp_ret = OB_SUCCESS;
// when fail and have created new snapshot before, need free this snapshot.
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = free_snapshot()))) {
LOG_ERROR("free dblink snapshot failed", K(tmp_ret));
}
#endif
}
}
if (OB_FAIL(ret)) {
@ -136,6 +215,21 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
void ObLinkScanOp::reset_dblink()
{
int tmp_ret = OB_SUCCESS;
#ifdef OB_BUILD_DBLINK
if (DBLINK_DRV_OCI == link_type_ &&
NULL != dblink_conn_) {
if (OB_SUCCESS != (tmp_ret = static_cast<ObOciConnection *>(dblink_conn_)->free_oci_stmt())) {
LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret));
}
if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = free_snapshot()))) {
LOG_WARN_RET(tmp_ret, "free dblink snapshot failed");
}
} else if (NULL != tm_rm_connection_ &&
DblinkDriverProto::DBLINK_DRV_OCI == tm_rm_connection_->get_dblink_driver_proto() &&
OB_SUCCESS != (tmp_ret = static_cast<ObOciConnection *>(tm_rm_connection_)->free_oci_stmt())) {
LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret));
}
#endif
if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_) && !in_xa_trascaction_ &&
OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type_, dblink_conn_))) {
LOG_WARN_RET(tmp_ret, "failed to release connection", K(tmp_ret));
@ -368,6 +462,14 @@ int ObLinkScanOp::inner_rescan()
iter_end_ = false;
iterated_rows_ = -1;
int tmp_ret = OB_SUCCESS;
#ifdef OB_BUILD_DBLINK
if (DBLINK_DRV_OCI == link_type_ &&
NULL != dblink_conn_) {
if (OB_SUCCESS != (tmp_ret = static_cast<ObOciConnection *>(dblink_conn_)->free_oci_stmt())) {
LOG_WARN_RET(tmp_ret, "failed to close oci result", K(tmp_ret));
}
}
#endif
return ObOperator::inner_rescan();
}