fix dblink transaction bug

This commit is contained in:
cqliang1995
2024-03-15 17:15:43 +00:00
committed by ob-robot
parent 4de78f3ba5
commit 43c435fe20
50 changed files with 1287 additions and 453 deletions

View File

@ -50,7 +50,6 @@ ObLinkScanOp::ObLinkScanOp(ObExecContext &exec_ctx, const ObOpSpec &spec, ObOpIn
iter_end_(false),
row_allocator_(),
iterated_rows_(-1),
tm_session_(NULL),
tm_rm_connection_(NULL),
reverse_link_(NULL),
conn_type_(sql::DblinkGetConnType::DBLINK_POOL),
@ -62,6 +61,7 @@ void ObLinkScanOp::reset()
{
tz_info_ = NULL;
dblink_schema_ = NULL;
tm_sessid_ = 0;
reset_result();
reset_link_sql();
reset_dblink();
@ -152,7 +152,6 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
uint16_t ncharset_id = 0;
ObSQLSessionInfo * my_session = NULL;
my_session = ctx_.get_my_session();
transaction::ObTransID tx_id;
bool have_lob = false;
res_.set_enable_use_result(true);
bool new_snapshot = false;
@ -174,8 +173,6 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
} else if (OB_ISNULL(dblink_proxy_) || OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected NULL", K(ret), KP(dblink_proxy_), KP(my_session));
} else if (need_tx(my_session) && OB_FAIL(ObTMService::tm_rm_start(ctx_, link_type_, dblink_conn_, tx_id))) {
LOG_WARN("failed to tm_rm_start", K(ret), K(dblink_id_), K(dblink_conn_), K(sessid_));
#ifdef OB_BUILD_DBLINK
} else if (OB_FAIL(init_conn_snapshot(new_snapshot))) {
LOG_WARN("init conn snapshot failed", K(ret));
@ -207,7 +204,7 @@ int ObLinkScanOp::inner_execute_link_stmt(const char *link_stmt)
ret = OB_NOT_SUPPORTED;
LOG_WARN("dblink not support lob type", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "dblink fetch lob type data");
} else if (OB_FAIL(ObLinkOp::get_charset_id(ctx_, charset_id, ncharset_id))) {
} else if (OB_FAIL(ObDblinkService::get_charset_id(my_session, charset_id, ncharset_id))) {
LOG_WARN("failed to get charset id", K(ret));
} else if (OB_FAIL(result_->set_expected_charset_id(charset_id, ncharset_id))) {// for oci dblink set expected result charset, actually useless...
LOG_WARN("failed to set result set expected charset", K(ret), K(charset_id), K(ncharset_id));
@ -245,13 +242,20 @@ void ObLinkScanOp::reset_dblink()
LOG_WARN_RET(tmp_ret, "free dblink snapshot failed");
}
#endif
if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_) && !in_xa_trascaction_ &&
if (OB_NOT_NULL(dblink_proxy_) && OB_NOT_NULL(dblink_conn_) && !in_xa_transaction_ &&
OB_SUCCESS != (tmp_ret = dblink_proxy_->release_dblink(link_type_, dblink_conn_))) {
LOG_WARN_RET(tmp_ret, "failed to release connection", K(tmp_ret));
}
if (OB_NOT_NULL(reverse_link_)) {
reverse_link_->close();
}
if (in_xa_transaction_ && OB_NOT_NULL(dblink_conn_)) {
ObDblinkCtxInSession::revert_dblink_conn(dblink_conn_); // release rlock locked by get_dblink_conn
}
if (OB_NOT_NULL(tm_rm_connection_)) {
ObDblinkCtxInSession::revert_dblink_conn(tm_rm_connection_); // release rlock locked by get_dblink_conn
}
tenant_id_ = OB_INVALID_ID;
dblink_id_ = OB_INVALID_ID;
dblink_proxy_ = NULL;
@ -282,21 +286,22 @@ int ObLinkScanOp::inner_open()
ObSQLSessionInfo *session = ctx_.get_my_session();
ObPhysicalPlanCtx *plan_ctx = ctx_.get_physical_plan_ctx();
const ObPhysicalPlan *phy_plan = MY_SPEC.get_phy_plan();
int64_t tm_sessid = -1;
reverse_link_ = NULL;
stmt_buf_len_ += head_comment_length_;
if (NULL != phy_plan) {
tm_sessid = phy_plan->tm_sessid_;
}
if (OB_ISNULL(session) || OB_ISNULL(plan_ctx)) {
dblink_id_ = MY_SPEC.dblink_id_;
dblink_proxy_ = GCTX.dblink_proxy_;
if (OB_ISNULL(session) || OB_ISNULL(plan_ctx) || OB_ISNULL(phy_plan) || OB_ISNULL(dblink_proxy_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("session or plan_ctx is NULL", K(ret), KP(session), KP(plan_ctx));
LOG_WARN("session or plan_ctx or dblink_proxy_ or dblink_proxy_ is NULL", K(ret), KP(session), KP(plan_ctx), KP(phy_plan), KP(dblink_proxy_));
} else if (FALSE_IT(tenant_id_ = session->get_effective_tenant_id())) {
} else if (FALSE_IT(sessid_ = session->get_sessid())) {
} else if (FALSE_IT(tm_sessid_ = plan_ctx->get_tm_sessid())) {
} else if (OB_FAIL(set_next_sql_req_level())) {
LOG_WARN("failed to set next sql req level", K(ret));
} else if (FALSE_IT(tenant_id_ = session->get_effective_tenant_id())) {
} else if (MY_SPEC.is_reverse_link_) {
// RM process sql within @! and @xxxx! send by TM
LOG_DEBUG("link scan op, RM process sql within @! and @xxxx! send by TM");
LOG_TRACE("link scan op, RM process sql within @! and @xxxx! send by TM");
conn_type_ = sql::DblinkGetConnType::TEMP_CONN;
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx_);
if (OB_ISNULL(plan_ctx)) {
@ -310,46 +315,38 @@ int ObLinkScanOp::inner_open()
} else if (OB_FAIL(reverse_link_->open(next_sql_req_level_))) {
LOG_WARN("failed to open reverse_link", K(ret));
}
} else if (-1 != tm_sessid) { // TM process sql within @xxxx send by RM
LOG_DEBUG("link scan op, TM process sql within @xxxx send by RM", K(tm_sessid));
sql::ObSQLSessionMgr *session_mgr = GCTX.session_mgr_;
if (OB_ISNULL(session_mgr)) {
} else if (0 != tm_sessid_) { // TM process sql within @xxxx send by RM
LOG_TRACE("link scan op, TM process sql within @xxxx send by RM", K(tm_sessid_));
if (OB_FAIL(session->get_dblink_context().get_dblink_conn(MY_SPEC.dblink_id_, tm_rm_connection_, tm_sessid_))) {
LOG_TRACE("link scan op failed to get tm_tm_connection", K(tm_sessid_), K(MY_SPEC.dblink_id_), K(ret));
} else if (OB_ISNULL(tm_rm_connection_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected null", K(ret), KP(session_mgr));
} else if (OB_FAIL(session_mgr->get_session(static_cast<uint32_t>(tm_sessid), tm_session_))) {
LOG_WARN("failed to get session", K(ret), K(tm_sessid));
LOG_WARN("unexcepted null ptr", K(tm_sessid_), K(MY_SPEC.dblink_id_), K(ret));
} else {
if (NULL != tm_session_ &&
tm_session_->get_dblink_context().get_dblink_conn(MY_SPEC.dblink_id_, tm_rm_connection_)) {
LOG_WARN("failed to get dblink connection from session", KP(tm_session_), K(ret));
} else if (NULL != tm_rm_connection_){
conn_type_ = sql::DblinkGetConnType::TM_CONN;
LOG_DEBUG("get tm sesseion and connection", KP(tm_session_), KP(tm_rm_connection_));
}
session_mgr->revert_session(tm_session_);
tm_session_ = NULL;
conn_type_ = sql::DblinkGetConnType::TM_CONN;
link_type_ = tm_rm_connection_->get_dblink_driver_proto();
LOG_TRACE("get tm sesseion and connection", KP(tm_rm_connection_), K(link_type_), K(tm_sessid_), K(sessid_), K(ret));
}
} else if (OB_FAIL(init_dblink())) {
LOG_WARN("failed to init dblink", K(ret), K(dblink_id_), K(MY_SPEC.is_reverse_link_));
}
if (OB_FAIL(ret)) {
// do nothing
} else if (sql::DblinkGetConnType::DBLINK_POOL == conn_type_ &&
OB_FAIL(init_dblink(MY_SPEC.dblink_id_, GCTX.dblink_proxy_, MY_SPEC.has_for_update_))) {
LOG_WARN("failed to init dblink", K(ret), K(MY_SPEC.dblink_id_), K(MY_SPEC.is_reverse_link_));
} else if (OB_FAIL(init_tz_info(TZ_INFO(session)))) {
if (OB_SUCC(ret)) {
if (OB_FAIL(init_tz_info(TZ_INFO(session)))) {
LOG_WARN("failed to tz info", K(ret), KP(session));
} else {
row_allocator_.set_tenant_id(tenant_id_);
row_allocator_.set_label("linkoprow");
row_allocator_.set_ctx_id(ObCtxIds::WORK_AREA);
}
if (OB_SUCC(ret) && OB_ISNULL(stmt_buf_ = static_cast<char *>(allocator_.alloc(stmt_buf_len_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to init stmt buf", K(ret), K(stmt_buf_len_));
} else if (OB_ISNULL(stmt_buf_ = static_cast<char *>(allocator_.alloc(stmt_buf_len_)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to init stmt buf", K(ret), K(stmt_buf_len_));
} else {
row_allocator_.set_tenant_id(tenant_id_);
row_allocator_.set_label("linkoprow");
row_allocator_.set_ctx_id(ObCtxIds::WORK_AREA);
}
LOG_TRACE("succ to open link scan op", K(dblink_id_), K(tm_sessid_), K(MY_SPEC.is_reverse_link_), K(conn_type_), K(ret));
}
// close reverse_link
if (NULL != reverse_link_ && OB_FAIL(ret)) {
reverse_link_->close();
LOG_DEBUG("close reverse link", KP(reverse_link_), K(ret));
LOG_TRACE("close reverse link", KP(reverse_link_), K(ret));
}
return ret;
}
@ -544,19 +541,5 @@ int ObLinkScanOp::inner_get_next_batch(const int64_t max_row_cnt)
return ret;
}
// PLEASE check the input parameter in public interface
bool ObLinkScanOp::need_tx(const ObSQLSessionInfo *my_session) const
{
bool ret_bool = false;
if (MY_SPEC.has_for_update_) {
// case 1, if select for update, tm_rm_start is required to be called.
ret_bool = true;
} else if (false && my_session->is_in_transaction()) {
// case 2, if select in transaction, tm_rm_start is required to be called.
ret_bool = true;
}
return ret_bool;
}
} // end namespace sql
} // end namespace oceanbase

View File

@ -64,7 +64,6 @@ private:
bool iter_end_;
common::ObArenaAllocator row_allocator_;
int64_t iterated_rows_;
ObSQLSessionInfo *tm_session_;
common::sqlclient::ObISQLConnection *tm_rm_connection_;
ObReverseLink *reverse_link_;
sql::DblinkGetConnType conn_type_;