/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #define USING_LOG_PREFIX SQL_EXE #include "share/ob_schema_status_proxy.h" // ObSchemaStatusProxy #include "share/ob_encryption_util.h" #include "sql/ob_sql_trans_control.h" #include "sql/ob_sql_trans_hook.h" #include "sql/engine/ob_physical_plan.h" #include "sql/engine/ob_physical_plan_ctx.h" #include "sql/parser/parse_malloc.h" #include "sql/resolver/ob_stmt.h" #include "sql/session/ob_sql_session_info.h" #include "sql/ob_sql_trans_util.h" #include "sql/ob_end_trans_callback.h" #include "lib/oblog/ob_trace_log.h" #include "storage/ob_partition_service.h" #include "storage/transaction/ob_trans_define.h" #include "common/ob_partition_key.h" #include "sql/engine/ob_exec_context.h" #include "sql/executor/ob_task_spliter.h" #include "lib/profile/ob_perf_event.h" #include "observer/ob_server_struct.h" #include "observer/ob_server.h" #include "storage/transaction/ob_weak_read_util.h" //ObWeakReadUtil #if 0 #define DEBUG_AC_TRANS(exec_ctx) \ { \ ObSQLSessionInfo* __my_session = GET_MY_SESSION(exec_ctx); \ OB_ASSERT(NULL != __my_session); \ bool ac = __my_session->get_autocommit(); \ bool in_trans = __my_session->get_in_transaction(); \ LOG_INFO("ac & in_trans", K(ac), K(in_trans)); \ } #else #define DEBUG_AC_TRANS(exec_ctx) #endif #if 0 #define DEBUG_TRANS_STAGE(stage) \ if (my_session->get_has_temp_table_flag()) { \ LOG_WARN("TMP_TABLE " stage, K(my_session->get_trans_desc())); \ } #else #define DEBUG_TRANS_STAGE(stage) #endif namespace oceanbase { using namespace common; using namespace transaction; using namespace share; using namespace share::schema; namespace sql { int ObSqlTransControl::on_plan_start(ObExecContext& exec_ctx, bool is_remote /* = false*/) { DEBUG_AC_TRANS(exec_ctx); int ret = OB_SUCCESS; bool ac = true; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); if (OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) || OB_ISNULL(plan_ctx->get_phy_plan())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr or plan ctx or plan_ctx->get_phy_plan is null", K(ret), K(my_session), K(plan_ctx)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(my_session->get_autocommit(ac))) { LOG_WARN("fail to get autocommit", K(ret)); } else if (stmt::T_SELECT == plan_ctx->get_phy_plan()->get_stmt_type() && !plan_ctx->get_phy_plan()->has_for_update() && (!my_session->get_trans_desc().is_valid() || my_session->get_trans_desc().is_trans_end()) && my_session->is_support_external_consistent() && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2260 && !(my_session->is_in_transaction() && my_session->get_trans_desc().is_valid() && my_session->get_trans_desc().is_xa_local_trans()) && (share::is_oracle_mode() || my_session->get_local_autocommit())) { OZ(start_standalone_stmt(exec_ctx, *my_session, *plan_ctx, is_remote)); } else { bool in_trans = my_session->get_in_transaction(); if (ObSqlTransUtil::plan_can_start_trans(ac, in_trans)) { ret = implicit_start_trans(exec_ctx, is_remote); } } return ret; } /*int ObSqlTransControl::on_plan_end(ObExecContext &exec_ctx, bool is_rollback, ObExclusiveEndTransCallback &callback) { DEBUG_AC_TRANS(exec_ctx); int ret = OB_SUCCESS; ObSQLSessionInfo *my_session = GET_MY_SESSION(exec_ctx); bool ac = true; if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr is null", K(ret)); } else if (OB_UNLIKELY(my_session->is_zombie())) { //session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(my_session->get_autocommit(ac))) { LOG_WARN("fail to get autocommit", K(ret)); } else { bool in_trans = my_session->get_in_transaction(); if (ObSqlTransUtil::plan_can_end_trans(ac, in_trans)) { ret = implicit_end_trans(exec_ctx, is_rollback, callback); } } return ret; }*/ int ObSqlTransControl::explicit_start_trans(ObExecContext& ctx, const bool read_only) { int ret = OB_SUCCESS; ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(ctx); ObTaskExecutorCtx& task_exec_ctx = ctx.get_task_exec_ctx(); if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session) || OB_UNLIKELY(!task_exec_ctx.min_cluster_version_is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("invalid param", K(ret), K(plan_ctx), K(my_session), K(task_exec_ctx.get_min_cluster_version())); } else { int32_t access_mode = read_only ? ObTransAccessMode::READ_ONLY : ObTransAccessMode::READ_WRITE; int32_t tx_isolation = my_session->get_tx_isolation(); ObStartTransParam& start_trans_param = plan_ctx->get_start_trans_param(); start_trans_param.set_access_mode(access_mode); start_trans_param.set_isolation(tx_isolation); start_trans_param.set_type(my_session->get_trans_type()); start_trans_param.set_cluster_version(task_exec_ctx.get_min_cluster_version()); start_trans_param.set_inner_trans(my_session->is_inner()); NG_TRACE_EXT(start_trans, OB_ID(read_only), read_only, OB_ID(access_mode), access_mode); if (OB_FAIL(ObSqlTransControl::explicit_start_trans(ctx, start_trans_param))) { LOG_WARN("fail start trans", K(ret)); } } return ret; } int ObSqlTransControl::explicit_start_trans(ObExecContext& exec_ctx, transaction::ObStartTransParam& start_trans_param) { int ret = OB_SUCCESS; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr is null", K(ret)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else { if (false == my_session->get_in_transaction()) { if (OB_FAIL(start_trans(exec_ctx, start_trans_param))) { LOG_WARN("fail start explicit trans", K(ret)); } else { bool is_read_only = (start_trans_param.get_access_mode() == transaction::ObTransAccessMode::READ_ONLY) ? true : false; my_session->set_tx_read_only(is_read_only); my_session->set_in_transaction(true); } } } return ret; } int ObSqlTransControl::implicit_start_trans(ObExecContext& exec_ctx, bool is_remote /* = false*/) { int ret = OB_SUCCESS; ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx(); bool ac = true; if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session) || OB_ISNULL(plan_ctx->get_phy_plan())) { ret = OB_ERR_UNEXPECTED; LOG_WARN("can't get plan_ctx or session or phy_plan", K(ret), K(plan_ctx), K(my_session)); } else if (OB_UNLIKELY(!is_remote && !task_exec_ctx.min_cluster_version_is_valid())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("is not remote start trans, but task_exec_ctx.get_min_cluster_version() is invalid", K(ret), K(task_exec_ctx.get_min_cluster_version())); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(my_session->get_autocommit(ac))) { LOG_WARN("fail to get autocommit", K(ret)); } else { bool in_trans = my_session->get_in_transaction(); bool tx_read_only = my_session->get_tx_read_only(); int32_t tx_isolation = my_session->get_tx_isolation(); transaction::ObStartTransParam& start_trans_param = plan_ctx->get_start_trans_param(); if (true == ac && false == in_trans && stmt::T_SELECT == plan_ctx->get_phy_plan()->get_stmt_type() && !plan_ctx->get_phy_plan()->has_for_update()) { start_trans_param.set_access_mode(transaction::ObTransAccessMode::READ_ONLY); } else if (true == tx_read_only) { start_trans_param.set_access_mode(transaction::ObTransAccessMode::READ_ONLY); } else { start_trans_param.set_access_mode(transaction::ObTransAccessMode::READ_WRITE); } start_trans_param.set_type(my_session->get_trans_type()); start_trans_param.set_isolation(tx_isolation); start_trans_param.set_autocommit(ac && !in_trans); if (task_exec_ctx.min_cluster_version_is_valid()) { start_trans_param.set_cluster_version(task_exec_ctx.get_min_cluster_version()); } else { start_trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION()); } start_trans_param.set_inner_trans(my_session->is_inner()); if (OB_FAIL(start_trans(exec_ctx, start_trans_param, is_remote))) { LOG_WARN("fail start trans", K(ret)); } if (OB_SUCC(ret) && false == ac) { my_session->set_in_transaction(true); } } return ret; } int ObSqlTransControl::start_trans( ObExecContext& exec_ctx, transaction::ObStartTransParam& start_trans_param, bool is_remote /* = false*/) { DEBUG_AC_TRANS(exec_ctx); int ret = OB_SUCCESS; int64_t org_cluster_id = OB_INVALID_ORG_CLUSTER_ID; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); ObPhysicalPlanCtx* plan_ctx = exec_ctx.get_physical_plan_ctx(); storage::ObPartitionService* ps = NULL; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(plan_ctx), K(ps)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(my_session->get_ob_org_cluster_id(org_cluster_id))) { if (OB_LIKELY(is_remote && OB_ENTRY_NOT_EXIST == ret)) { LOG_WARN("start trans in remote, ob_org_cluster_id is not serilized to the remote server, " "maybe server version is different, then ignore it, " "use default cluster id and set ret to OB_SUCCESS"); ret = OB_SUCCESS; org_cluster_id = ObServerConfig::get_instance().cluster_id; } else { LOG_WARN("fail to get ob_org_cluster_id", K(ret)); } } if (OB_FAIL(ret)) { } else { if (OB_INVALID_ORG_CLUSTER_ID == org_cluster_id || OB_INVALID_CLUSTER_ID == org_cluster_id) { org_cluster_id = ObServerConfig::get_instance().cluster_id; if (OB_UNLIKELY(org_cluster_id < OB_MIN_CLUSTER_ID || org_cluster_id > OB_MAX_CLUSTER_ID)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("org_cluster_id is set to cluster_id, but it is out of range", K(ret), K(org_cluster_id), K(OB_MIN_CLUSTER_ID), K(OB_MAX_CLUSTER_ID)); } } if (OB_SUCC(ret)) { my_session->get_trans_result().reset(); transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); const uint64_t tenant_id = my_session->get_effective_tenant_id(); int64_t trans_timeout_ts = 0; if (OB_FAIL(get_trans_timeout_ts(*my_session, trans_timeout_ts))) { LOG_ERROR("fail to get trans timeout ts", K(ret)); } else if (OB_FAIL(ps->start_trans(tenant_id, static_cast(org_cluster_id), start_trans_param, trans_timeout_ts, my_session->get_sessid(), my_session->get_proxy_sessid(), trans_desc))) { LOG_WARN("fail start trans", K(ret)); } DEBUG_TRANS_STAGE("start_trans"); NG_TRACE_EXT(start_trans, OB_ID(trans_id), trans_desc.get_trans_id(), OB_ID(timeout), trans_timeout_ts, OB_ID(start_time), my_session->get_query_start_time()); } } return ret; } int ObSqlTransControl::implicit_end_trans( ObExecContext& exec_ctx, const bool is_rollback, ObExclusiveEndTransCallback& callback) { int ret = OB_SUCCESS; const bool is_explicit = false; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr is null, won't call end trans!!!", K(ret)); } else if (OB_FAIL(end_trans(exec_ctx, is_rollback, is_explicit, callback))) { LOG_WARN("fail to end trans", K(ret), K(is_rollback)); } return ret; } int ObSqlTransControl::explicit_end_trans(ObExecContext& exec_ctx, const bool is_rollback) { int ret = OB_SUCCESS; if (exec_ctx.is_end_trans_async()) { ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr is null", K(ret)); } else { ObEndTransAsyncCallback& call_back = my_session->get_end_trans_cb(); ret = explicit_end_trans(exec_ctx, is_rollback, call_back); } } else { ObEndTransSyncCallback call_back; ret = explicit_end_trans(exec_ctx, is_rollback, call_back); } return ret; } int ObSqlTransControl::explicit_end_trans( ObExecContext& exec_ctx, const bool is_rollback, ObEndTransSyncCallback& callback) { int ret = OB_SUCCESS; const bool is_explicit = true; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session ptr is null", K(ret)); } else { if (true == my_session->get_in_transaction()) { if (OB_FAIL(callback.init(&(my_session->get_trans_desc()), my_session))) { LOG_ERROR("fail init callback", K(ret), K(my_session->get_trans_desc())); // implicit commit, no rollback } else { int wait_ret = OB_SUCCESS; if (OB_FAIL(end_trans(exec_ctx, is_rollback, is_explicit, callback))) { LOG_WARN("fail end explicit trans", K(is_rollback), K(ret)); } if (OB_UNLIKELY(OB_SUCCESS != (wait_ret = callback.wait()))) { LOG_WARN("sync end trans callback return an error!", K(ret), K(wait_ret), K(is_rollback), K(my_session->get_trans_desc())); } ret = OB_SUCCESS != ret ? ret : wait_ret; } } else { my_session->reset_tx_variable(); my_session->set_early_lock_release(false); exec_ctx.set_need_disconnect(false); my_session->get_trans_desc().get_standalone_stmt_desc().reset(); } } return ret; } int ObSqlTransControl::explicit_end_trans( ObExecContext& exec_ctx, const bool is_rollback, ObEndTransAsyncCallback& callback) { int ret = OB_SUCCESS; const bool is_explicit = true; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); if (OB_ISNULL(my_session)) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session is NULL, won't call end trans!!!", K(ret), KP(my_session)); } else { if (true == my_session->get_in_transaction()) { // other fields will be and should be (re)set correctly in call back routine. if (OB_FAIL(end_trans(exec_ctx, is_rollback, is_explicit, callback))) { LOG_WARN("fail end explicit trans", K(is_rollback), K(ret)); } exec_ctx.get_trans_state().set_end_trans_executed(OB_SUCC(ret)); } else { my_session->reset_first_stmt_type(); my_session->reset_tx_variable(); my_session->set_early_lock_release(false); exec_ctx.set_need_disconnect(false); my_session->get_trans_desc().get_standalone_stmt_desc().reset(); } } return ret; } int ObSqlTransControl::kill_query_session( storage::ObPartitionService* ps, ObSQLSessionInfo& session, const ObSQLSessionState& status) { int ret = OB_SUCCESS; if (OB_ISNULL(ps)) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid argument for point", K(ret), K(ps), K(session)); } else if (true == session.get_in_transaction()) { const transaction::ObTransDesc& trans_desc = session.get_trans_desc(); if (OB_FAIL(ps->kill_query_session(trans_desc, status))) { LOG_WARN("fail kill query or session error", K(ret), K(trans_desc)); } } return ret; } // may concurrency with user-query, need blocking query: // - acquire session.query_lock by caller // - only rollback transaction in Transaction-Layer, don't touch SQL-Layer state int ObSqlTransControl::kill_active_trx(storage::ObPartitionService* ps, ObSQLSessionInfo* session) { LOG_DEBUG("kill active trx start", "session_id", session->get_sessid(), K(*session)); int ret = OB_SUCCESS; transaction::ObTransDesc& trans_desc = session->get_trans_desc(); if (trans_desc.is_valid() && !trans_desc.is_trans_end() && !trans_desc.is_trx_idle_timeout()) { if (OB_FAIL(ps->internal_kill_trans(trans_desc))) { LOG_WARN("kill active trx by end_trans fail.", K(ret), "session_id", session->get_sessid(), K(*session), K(trans_desc)); } else { LOG_INFO("kill active trx succeed, need user rollback in next stmt", "session_id", session->get_sessid(), K(*session), K(trans_desc)); } } return ret; } int ObSqlTransControl::end_trans( storage::ObPartitionService* ps, ObSQLSessionInfo* session, bool& has_called_txs_end_trans) { int ret = OB_SUCCESS; int wait_ret = OB_SUCCESS; int hook_ret = OB_SUCCESS; has_called_txs_end_trans = false; if (OB_ISNULL(ps) || OB_ISNULL(session)) { ret = OB_INVALID_ARGUMENT; LOG_ERROR("invalid argument for point", K(ret), K(ps), K(session)); } else if (true == session->get_in_transaction()) { transaction::ObTransDesc& trans_desc = session->get_trans_desc(); int64_t timeout = 0; ObEndTransSyncCallback callback; if (OB_FAIL(get_stmt_timeout_ts(*session, timeout))) { LOG_ERROR("fail to get stmt timeout ts", K(ret)); } else if (OB_FAIL(callback.init(&trans_desc, session))) { LOG_ERROR("fail init callback", K(ret), K(trans_desc)); } else { callback.set_is_need_rollback(true); callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_IMPLICIT); callback.handout(); if (OB_FAIL(ps->end_trans(true, trans_desc, callback, timeout))) { LOG_WARN("fail end trans when session terminate", K(ret), K(trans_desc), K(timeout)); } session->reset_first_stmt_type(); if (OB_UNLIKELY(OB_SUCCESS != (wait_ret = callback.wait()))) { LOG_WARN("sync end trans callback return an error!", K(ret), K(wait_ret), K(trans_desc), K(timeout)); } ret = OB_SUCCESS != ret ? ret : wait_ret; if (callback.is_txs_end_trans_called()) { has_called_txs_end_trans = true; } else { has_called_txs_end_trans = false; LOG_WARN("fail before trans service end trans, may disconnct", K(ret), K(session->get_sessid())); if (OB_UNLIKELY(OB_SUCCESS == ret)) { LOG_ERROR("callback before trans service end trans, but ret is OB_SUCCESS, it is BUG!!!", K(callback.is_txs_end_trans_called())); } } session->reset_tx_variable(); session->set_early_lock_release(false); } } else { session->reset_first_stmt_type(); has_called_txs_end_trans = true; } return (OB_SUCCESS != ret) ? ret : hook_ret; } int ObSqlTransControl::end_trans( ObExecContext& exec_ctx, const bool is_rollback, const bool is_explicit, ObExclusiveEndTransCallback& callback) { DEBUG_AC_TRANS(exec_ctx); int ret = OB_SUCCESS; int hook_ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); ObEndTransCallbackType callback_type = callback.get_callback_type(); storage::ObPartitionService* ps = NULL; int32_t xa_trans_state = transaction::ObXATransState::NON_EXISTING; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { LOG_ERROR("null ptr, won't call end trans!!!", K(executor_ctx), K(my_session), K(plan_ctx), K(ps)); ret = OB_ERR_UNEXPECTED; } else { transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); if (trans_desc.is_xa_local_trans() && !is_explicit) { if (OB_FAIL(ps->get_xa_trans_state(xa_trans_state, trans_desc))) { LOG_WARN("failed get_xa_trans_state", K(ret)); } else if (transaction::ObXATransState::NON_EXISTING != xa_trans_state) { ret = OB_TRANS_XA_RMFAIL; // LOG_USER_ERROR(OB_TRANS_XA_RMFAIL, ObXATransState::to_string(xa_trans_state)); LOG_WARN("invalid xa state", K(ret), K(xa_trans_state)); callback.set_is_need_rollback(is_rollback); callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_IMPLICIT); callback.handout(); callback.callback(ret); } } else { int64_t timeout = get_stmt_timeout_ts(*plan_ctx); callback.set_is_need_rollback(is_rollback); if (is_explicit) { callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_EXPLICIT); } else { callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_IMPLICIT); } callback.handout(); if (ASYNC_CALLBACK_TYPE == callback_type && OB_FAIL(inc_session_ref(my_session))) { LOG_WARN("fail to inc session ref", K(ret)); } else if (OB_FAIL(ps->end_trans(is_rollback, trans_desc, callback, timeout))) { LOG_WARN("fail end trans", K(ret), K(callback_type)); } DEBUG_TRANS_STAGE("end_trans"); my_session->reset_first_stmt_type(); if (callback.get_callback_type() != ASYNC_CALLBACK_TYPE) { my_session->reset_tx_variable(); my_session->set_early_lock_release(false); } // my_session->get_trans_desc().reset(); DEBUG_AC_TRANS(exec_ctx); if (ASYNC_CALLBACK_TYPE == callback_type) { exec_ctx.set_need_disconnect(false); } else if (!callback.is_txs_end_trans_called()) { exec_ctx.set_need_disconnect(true); LOG_WARN("fail before trans service end trans, disconnct", K(ret), K(my_session->get_sessid())); if (OB_UNLIKELY(OB_SUCCESS == ret)) { LOG_ERROR("callback before trans service end trans, but ret is OB_SUCCESS, it is BUG!!!", K(is_explicit), K(callback.is_txs_end_trans_called())); } } else { bool is_need_disconnect = false; ObSQLUtils::check_if_need_disconnect_after_end_trans(ret, is_rollback, is_explicit, is_need_disconnect); exec_ctx.set_need_disconnect(is_need_disconnect); } } } return (OB_SUCCESS != ret) ? ret : hook_ret; } bool ObSqlTransControl::is_weak_consistency_level(const ObConsistencyLevel& consistency_level) { return (WEAK == consistency_level || FROZEN == consistency_level); } int ObSqlTransControl::decide_trans_read_interface_specs(const char* module, const ObSQLSessionInfo& session, const stmt::StmtType& stmt_type, const stmt::StmtType& literal_stmt_type, const bool& is_contain_select_for_update, const bool& is_contain_inner_table, const ObConsistencyLevel& sql_consistency_level, const bool need_consistent_snapshot, int32_t& trans_consistency_level, int32_t& trans_consistency_type, int32_t& read_snapshot_type) { int ret = OB_SUCCESS; const char* err_msg = ""; ObConsistencyLevelAdaptor consistency_level_adaptor(sql_consistency_level); const bool is_standby_cluster = GCTX.is_standby_cluster(); const bool is_inner_sql = session.is_inner(); const bool is_from_show_stmt = (literal_stmt_type != stmt::T_NONE); const bool is_build_index_stmt = (stmt::T_BUILD_INDEX_SSTABLE == stmt_type); uint64_t tenant_id = session.get_effective_tenant_id(); const bool stmt_spec_snapshot_version_is_valid = session.has_valid_read_snapshot_version(); int32_t isolation = session.get_tx_isolation(); const bool is_serializable_trans = (ObTransIsolation::SERIALIZABLE == isolation || ObTransIsolation::REPEATABLE_READ == isolation); trans_consistency_level = (int32_t)consistency_level_adaptor.get_consistency(); trans_consistency_type = ObTransConsistencyType::CURRENT_READ; read_snapshot_type = ObTransReadSnapshotType::STATEMENT_SNAPSHOT; if (OB_UNLIKELY(!ObTransConsistencyLevel::is_valid(trans_consistency_level))) { ret = OB_ERR_UNEXPECTED; err_msg = "invalid trans consisntency level"; } else if (is_build_index_stmt) { trans_consistency_level = ObTransConsistencyLevel::WEAK; trans_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ; } else if (is_from_show_stmt) { if (!is_standby_cluster || OB_SYS_TENANT_ID == tenant_id) { trans_consistency_level = ObTransConsistencyLevel::STRONG; trans_consistency_type = ObTransConsistencyType::CURRENT_READ; } else { trans_consistency_level = ObTransConsistencyLevel::WEAK; trans_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ; } } else if (stmt::T_SELECT != stmt_type || (stmt::T_SELECT == stmt_type && is_contain_select_for_update)) { trans_consistency_level = ObTransConsistencyLevel::STRONG; trans_consistency_type = ObTransConsistencyType::CURRENT_READ; } else if (is_contain_inner_table) { if (is_inner_sql) { } else { if (!is_standby_cluster || OB_SYS_TENANT_ID == tenant_id) { trans_consistency_level = ObTransConsistencyLevel::STRONG; } else { } } if (ObTransConsistencyLevel::STRONG == trans_consistency_level) { trans_consistency_type = ObTransConsistencyType::CURRENT_READ; } else { trans_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ; } } else { int32_t cur_stmt_consistency_type = ObTransConsistencyType::UNKNOWN; const int32_t first_stmt_consistency_type = session.get_trans_consistency_type(); if (ObTransConsistencyLevel::STRONG == trans_consistency_level) { cur_stmt_consistency_type = ObTransConsistencyType::CURRENT_READ; } else { cur_stmt_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ; } if (ObTransConsistencyType::UNKNOWN == first_stmt_consistency_type) { trans_consistency_type = cur_stmt_consistency_type; } else { trans_consistency_type = first_stmt_consistency_type; if (OB_UNLIKELY(cur_stmt_consistency_type != first_stmt_consistency_type)) { err_msg = "current statement consistency type is different from the first statement"; } } } if (OB_SUCCESS == ret) { if (is_serializable_trans) { read_snapshot_type = ObTransReadSnapshotType::TRANSACTION_SNAPSHOT; } else { if (is_contain_inner_table || !need_consistent_snapshot) { read_snapshot_type = ObTransReadSnapshotType::PARTICIPANT_SNAPSHOT; } else { read_snapshot_type = ObTransReadSnapshotType::STATEMENT_SNAPSHOT; } } } if (OB_FAIL(ret) || OB_UNLIKELY(0 != STRCMP(err_msg, ""))) { LOG_WARN(err_msg, K(ret), K(tenant_id), K(module), K(trans_consistency_type), K(read_snapshot_type), K(trans_consistency_level), K(sql_consistency_level), K(is_standby_cluster), K(is_inner_sql), K(is_from_show_stmt), K(is_build_index_stmt), K(is_serializable_trans), K(is_contain_inner_table), K(stmt_spec_snapshot_version_is_valid), K(is_contain_select_for_update), K(stmt_type), K(literal_stmt_type), K(session), "trans_desc", session.get_trans_desc()); } else { LOG_DEBUG("decide_trans_read_interface_specs", K(ret), K(tenant_id), K(module), K(trans_consistency_type), K(read_snapshot_type), K(trans_consistency_level), K(sql_consistency_level), K(is_standby_cluster), K(is_inner_sql), K(is_from_show_stmt), K(is_build_index_stmt), K(is_serializable_trans), K(is_contain_inner_table), K(stmt_spec_snapshot_version_is_valid), K(is_contain_select_for_update), K(stmt_type), K(literal_stmt_type), K(session), "trans_desc", session.get_trans_desc()); } return ret; } bool ObSqlTransControl::check_fast_select_read_uncommited( transaction::ObTransDesc& trans_desc, const common::ObPartitionLeaderArray& pla) { bool read_uncommited = false; const common::ObPartitionArray& participants = trans_desc.get_participants(); for (int i = 0; !read_uncommited && i < pla.get_partitions().count(); ++i) { const ObPartitionKey& pkey = pla.get_partitions().at(i); for (int j = 0; j < participants.count(); ++j) { if (pkey == participants.at(j)) { read_uncommited = true; break; } } } return read_uncommited; } int ObSqlTransControl::update_safe_weak_read_snapshot( bool is_bounded_staleness_read, ObSQLSessionInfo& session_info, int snapshot_type, int64_t snapshot_version) { int ret = OB_SUCCESS; if (is_bounded_staleness_read) { if (ObWeakReadUtil::enable_monotonic_weak_read(session_info.get_effective_tenant_id()) && !session_info.has_valid_read_snapshot_version() && ObTransReadSnapshotType::STATEMENT_SNAPSHOT == snapshot_type) { int64_t cur_stmt_snapshot_version = snapshot_version; if (OB_UNLIKELY(cur_stmt_snapshot_version <= 0)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "statement snapshot version is invalid, unexpected error", K(ret), K(cur_stmt_snapshot_version), K(session_info), K(session_info.get_trans_desc())); } else if (OB_FAIL(session_info.update_safe_weak_read_snapshot(session_info.get_effective_tenant_id(), cur_stmt_snapshot_version, ObBasicSessionInfo::LAST_STMT))) { TRANS_LOG(WARN, "update safe weak read snapshot version error", K(ret), K(snapshot_type), K(snapshot_version), K(session_info), K(session_info.get_trans_desc())); } else if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) { LOG_INFO("update safe weak read snapshot version success", K(ret), "snapshot_version", snapshot_version, "session", session_info, K(session_info.get_trans_desc())); } else { // do nothing } } } return ret; } int ObSqlTransControl::start_stmt( ObExecContext& exec_ctx, const common::ObPartitionLeaderArray& pla, bool is_remote /* = false*/) { int ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); const ObPhysicalPlanCtx* phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; const ObPhysicalPlan* phy_plan = NULL; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(phy_plan_ctx) || OB_ISNULL(ps = executor_ctx->get_partition_service()) || OB_ISNULL(phy_plan = phy_plan_ctx->get_phy_plan())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(phy_plan_ctx), K(ps), K(phy_plan)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(my_session->set_cur_stmt_tables(pla.get_partitions(), phy_plan->get_stmt_type()))) { LOG_WARN("set cur stmt tables fail", K(ret)); } else if (/*my_session->reuse_cur_sql_no() ||*/ my_session->is_standalone_stmt()) { // skip start_stmt. } else if (my_session->is_nested_session()) { ObTransDesc& trans_desc = my_session->get_trans_desc(); TransResult& trans_result = my_session->get_trans_result(); if (OB_FAIL(ps->start_nested_stmt(trans_desc))) { LOG_WARN("start nested stmt fail", K(ret), K(trans_desc)); } else if (OB_FAIL(trans_result.set_max_sql_no(trans_desc.get_sql_no()))) { LOG_WARN("refresh max sql no fail", K(ret), K(trans_result), K(trans_desc)); } /* int64_t nested_count = my_session->get_nested_count(); int64_t max_sql_no = trans_result.get_max_sql_no(); const ObString nested_stmt = my_session->get_current_query_string(); LOG_INFO("start_nested_stmt", K(nested_count), K(nested_stmt), K(max_sql_no)); */ } else if (my_session->is_fast_select()) { if (pla.count() > 0) { ObPartitionLeaderArray out_pla; OZ(change_pla_info_(executor_ctx->get_table_locations(), pla, out_pla)); OX(my_session->set_read_uncommited(check_fast_select_read_uncommited(my_session->get_trans_desc(), out_pla))); } } else if (OB_UNLIKELY(ObSQLSessionInfo::INVALID_TYPE == my_session->get_session_type())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("session type is not init", K(my_session), K(ret)); } else if (OB_UNLIKELY(INVALID_CONSISTENCY == phy_plan_ctx->get_consistency_level())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("sql consistency level is invalid", K(ret), K(phy_plan_ctx->get_consistency_level())); } else { ObStmtParam stmt_param; ObPartitionArray unreachable_partitions; const bool is_retry_sql = (executor_ctx->get_retry_times() > 0); const uint64_t tenant_id = my_session->get_effective_tenant_id(); transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); transaction::ObStartTransParam& trans_param = trans_desc.get_trans_param(); transaction::ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc(); int32_t trans_consistency_level = ObTransConsistencyLevel::UNKNOWN; const int64_t last_safe_weak_read_snapshot = my_session->get_safe_weak_read_snapshot(); const int64_t last_safe_weak_read_snapshot_source = my_session->get_safe_weak_read_snapshot_source(); int64_t auto_spec_snapshot_version = ObTransVersion::INVALID_TRANS_VERSION; const bool is_trx_elr = my_session->get_early_lock_release(); my_session->get_trans_result().clear_stmt_result(); if (OB_FAIL(set_trans_param_(*my_session, *phy_plan, phy_plan_ctx->get_consistency_level(), trans_consistency_level, auto_spec_snapshot_version))) { LOG_WARN("set trans param fail", K(ret), KPC(my_session)); } else if (OB_FAIL(stmt_param.init(tenant_id, get_stmt_timeout_ts(*phy_plan_ctx), is_retry_sql, last_safe_weak_read_snapshot, last_safe_weak_read_snapshot_source, is_trx_elr))) { LOG_WARN("ObStmtParam init error", K(ret), K(tenant_id), K(is_retry_sql), K(last_safe_weak_read_snapshot)); } else { stmt_desc.phy_plan_type_ = phy_plan->has_nested_sql() ? OB_PHY_PLAN_UNCERTAIN : phy_plan->get_location_type(); stmt_desc.stmt_type_ = phy_plan->get_stmt_type(); stmt_desc.is_sfu_ = phy_plan->has_for_update(); stmt_desc.execution_id_ = my_session->get_current_execution_id(); stmt_desc.sql_id_.assign_strive(phy_plan->stat_.sql_id_); stmt_desc.trace_id_adaptor_.set(ObCurTraceId::get()); const ObString& app_trace_id_str = my_session->get_app_trace_id(); stmt_desc.app_trace_id_str_.reset(); stmt_desc.app_trace_id_str_.assign_buffer(stmt_desc.buffer_, sizeof(stmt_desc.buffer_)); stmt_desc.app_trace_id_str_.write(app_trace_id_str.ptr(), app_trace_id_str.length()); stmt_desc.cur_query_start_time_ = my_session->get_query_start_time(); stmt_desc.stmt_tenant_id_ = my_session->get_effective_tenant_id(); stmt_desc.inner_sql_ = my_session->is_inner(); stmt_desc.consistency_level_ = trans_consistency_level; stmt_desc.trx_lock_timeout_ = my_session->get_trx_lock_timeout(); if (my_session->has_valid_read_snapshot_version()) { stmt_desc.cur_stmt_specified_snapshot_version_ = my_session->get_read_snapshot_version(); } else if (auto_spec_snapshot_version > 0) { stmt_desc.cur_stmt_specified_snapshot_version_ = auto_spec_snapshot_version; } /* const ObString root_stmt = my_session->get_current_query_string(); LOG_INFO("start_root_stmt", K(root_stmt)); */ ObPartitionLeaderArray out_pla; if (pla.count() > 0 && OB_FAIL(change_pla_info_(executor_ctx->get_table_locations(), pla, out_pla))) { LOG_WARN("change pla info error", K(ret), K(stmt_param), K(trans_desc), K(pla)); } else if (OB_FAIL(ps->start_stmt(stmt_param, trans_desc, out_pla, unreachable_partitions))) { LOG_WARN("fail start stmt", K(ret), K(trans_desc), K(pla), K(out_pla), K(is_remote), K(unreachable_partitions.count())); if ((is_transaction_rpc_timeout_err(ret) || is_data_not_readable_err(ret) || is_partition_change_error(ret)) && !is_remote) { ObAddr unreachable_server; for (int64_t i = 0; i < unreachable_partitions.count(); ++i) { unreachable_server.reset(); int tmp_ret = OB_SUCCESS; if (OB_UNLIKELY( OB_SUCCESS != (tmp_ret = out_pla.find_leader(unreachable_partitions.at(i), unreachable_server)))) { LOG_ERROR("can not find unreachable server", K(ret), K(tmp_ret), K(i), K(unreachable_partitions.at(i))); } else if (OB_UNLIKELY(OB_SUCCESS != (tmp_ret = my_session->get_retry_info_for_update().add_invalid_server_distinctly( unreachable_server, true)))) { LOG_WARN("fail to add invalid server distinctly", K(ret), K(tmp_ret), K(unreachable_server), K(my_session->get_retry_info())); } } } } else { OZ(update_safe_weak_read_snapshot(trans_param.is_bounded_staleness_read(), *my_session, trans_param.get_read_snapshot_type(), trans_desc.get_snapshot_version())); OZ(my_session->set_start_stmt()); if (phy_plan->is_contain_oracle_trx_level_temporary_table()) { trans_desc.set_trx_level_temporary_table_involved(); } DEBUG_TRANS_STAGE("start_stmt"); } } } return ret; } int ObSqlTransControl::specify_stmt_snapshot_version_for_slave_cluster_sql_(const ObSQLSessionInfo& session, const stmt::StmtType& literal_stmt_type, const bool& is_contain_inner_table, const int32_t consistency_type, const int32_t read_snapshot_type, int64_t& auto_spec_snapshot_version) { int ret = OB_SUCCESS; share::ObSchemaStatusProxy* schema_status_proxy = GCTX.schema_status_proxy_; share::schema::ObRefreshSchemaStatus slave_schema_status; const uint64_t tenant_id = session.get_effective_tenant_id(); const bool is_standby_cluster = GCTX.is_standby_cluster(); const bool is_inner_sql = session.is_inner(); const bool is_from_show_stmt = (literal_stmt_type != stmt::T_NONE); const bool stmt_spec_snapshot_version_is_valid = session.has_valid_read_snapshot_version(); if (is_standby_cluster && is_contain_inner_table && !is_inner_sql && OB_SYS_TENANT_ID != tenant_id && !stmt_spec_snapshot_version_is_valid && ObTransConsistencyType::BOUNDED_STALENESS_READ == consistency_type) { if (OB_ISNULL(schema_status_proxy)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid schema_status_proxy", K(schema_status_proxy)); } else if (OB_FAIL(schema_status_proxy->nonblock_get(tenant_id, slave_schema_status))) { LOG_WARN("get schema status on slave cluster fail", K(ret), K(tenant_id), K(session), K(is_standby_cluster), K(is_from_show_stmt), K(is_contain_inner_table), K(consistency_type), K(read_snapshot_type)); if (OB_ENTRY_NOT_EXIST == ret) { ret = OB_REPLICA_NOT_READABLE; } } else if (OB_UNLIKELY(slave_schema_status.snapshot_timestamp_ <= 0)) { ret = OB_REPLICA_NOT_READABLE; LOG_WARN("tenant schema status is invalid on slave cluster", K(ret), K(tenant_id), K(slave_schema_status), K(session), K(is_standby_cluster), K(is_from_show_stmt), K(is_contain_inner_table), K(consistency_type), K(read_snapshot_type)); } else { auto_spec_snapshot_version = slave_schema_status.snapshot_timestamp_; LOG_INFO("specify statement snapshot version for slave cluster SQL", K(tenant_id), K(slave_schema_status), K(is_standby_cluster), K(is_inner_sql), K(is_from_show_stmt), K(is_contain_inner_table), K(consistency_type), K(read_snapshot_type), K(session)); } if (OB_REPLICA_NOT_READABLE == ret) { LOG_WARN("slave cluster inner table readable snapshot version still not ready, need retry", K(ret), K(tenant_id), K(is_standby_cluster), K(is_inner_sql), K(is_from_show_stmt), K(consistency_type), K(read_snapshot_type), K(is_contain_inner_table), K(session)); } } else { } return ret; } int ObSqlTransControl::trans_param_compat_with_cluster_before_2200_(ObTransDesc& trans_desc) { int ret = OB_SUCCESS; if (trans_desc.get_cluster_version() < CLUSTER_VERSION_2200) { if (trans_desc.get_snapshot_version() == ObTransVersion::INVALID_TRANS_VERSION) { trans_desc.set_snapshot_version(0); } } return ret; } int ObSqlTransControl::set_trans_param_(ObSQLSessionInfo& my_session, const ObPhysicalPlan& phy_plan, const ObConsistencyLevel sql_consistency_level, int32_t& trans_consistency_level, int64_t& auto_spec_snapshot_version) { int ret = OB_SUCCESS; transaction::ObTransDesc& trans_desc = my_session.get_trans_desc(); int32_t trans_consistency_type = ObTransConsistencyType::UNKNOWN; int32_t read_snapshot_type = ObTransReadSnapshotType::UNKNOWN; if (OB_FAIL(decide_trans_read_interface_specs("ObSqlTransControl::start_stmt", my_session, phy_plan.get_stmt_type(), phy_plan.get_literal_stmt_type(), phy_plan.has_for_update(), phy_plan.is_contain_inner_table(), sql_consistency_level, phy_plan.need_consistent_snapshot(), trans_consistency_level, trans_consistency_type, read_snapshot_type))) { LOG_WARN("fail to decide trans read interface specs", K(ret), K(phy_plan.get_stmt_type()), K(phy_plan.get_literal_stmt_type()), K(phy_plan.has_for_update()), K(phy_plan.is_contain_inner_table()), K(sql_consistency_level), K(my_session)); } else if (GCTX.is_standby_cluster() && OB_FAIL(specify_stmt_snapshot_version_for_slave_cluster_sql_(my_session, phy_plan.get_literal_stmt_type(), phy_plan.is_contain_inner_table(), trans_consistency_type, read_snapshot_type, auto_spec_snapshot_version))) { LOG_WARN("specify statement snapshot version for slave cluster sql fail", K(ret), K(my_session)); } else if (OB_FAIL(trans_param_compat_with_cluster_before_2200_(trans_desc))) { LOG_WARN("trans param compat with cluster before 2200 fail", K(ret), K(trans_desc)); } else { trans_desc.get_trans_param().set_read_snapshot_type(read_snapshot_type); if (ObTransConsistencyType::UNKNOWN == my_session.get_trans_consistency_type()) { my_session.set_trans_consistency_type(trans_consistency_type); trans_desc.get_trans_param().set_consistency_type(trans_consistency_type); } else if (trans_desc.get_trans_param().get_consistency_type() != trans_consistency_type) { ret = OB_NOT_SUPPORTED; LOG_WARN("different consistency type in one transaction, unexpected error", K(ret), K(trans_consistency_type), K(trans_desc)); LOG_USER_ERROR(OB_NOT_SUPPORTED, "different consistency type in one transaction"); } if (OB_SUCCESS == ret) { if (ObTransConsistencyType::BOUNDED_STALENESS_READ == trans_consistency_type) { trans_desc.get_trans_param().set_access_mode(transaction::ObTransAccessMode::READ_ONLY); } } } LOG_DEBUG("ObSqlTransControl::start_stmt::set_trans_param", K(sql_consistency_level), K(trans_consistency_type), K(trans_consistency_level), K(read_snapshot_type), K(auto_spec_snapshot_version), K(trans_desc), K(my_session)); return ret; } int ObSqlTransControl::get_discard_participants(const ObPartitionArray& all_partitions, const ObPartitionArray& response_partitions, ObPartitionArray& discard_partitions) { int ret = OB_SUCCESS; if (OB_UNLIKELY(all_partitions.count() > response_partitions.count())) { for (int64_t i = 0; OB_SUCC(ret) && i < all_partitions.count(); ++i) { if (OB_UNLIKELY(!has_exist_in_array(response_partitions, all_partitions.at(i)))) { OZ(discard_partitions.push_back(all_partitions.at(i)), i, all_partitions.at(i)); } } LOG_DEBUG("get discard participants", K(ret), K(all_partitions), K(response_partitions), K(discard_partitions)); } return ret; } int ObSqlTransControl::create_savepoint(ObExecContext& exec_ctx, const ObString& sp_name) { int ret = OB_SUCCESS; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(ps->savepoint(my_session->get_trans_desc(), sp_name))) { LOG_WARN("fail to create savepoint", K(ret), K(sp_name)); } return ret; } int ObSqlTransControl::rollback_savepoint(ObExecContext& exec_ctx, const ObString& sp_name) { int ret = OB_SUCCESS; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); const ObPhysicalPlanCtx* phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; transaction::ObStmtParam stmt_param; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL( stmt_param.init(my_session->get_effective_tenant_id(), get_stmt_timeout_ts(*phy_plan_ctx), false))) { LOG_WARN("ObStmtParam init error", K(ret), K(my_session)); } else if (OB_FAIL(ps->rollback_savepoint(my_session->get_trans_desc(), sp_name, stmt_param))) { LOG_WARN("fail to rollback savepoint", K(ret), K(sp_name), K(stmt_param)); } return ret; } int ObSqlTransControl::release_savepoint(ObExecContext& exec_ctx, const ObString& sp_name) { int ret = OB_SUCCESS; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps)); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(ps->release_savepoint(my_session->get_trans_desc(), sp_name))) { LOG_WARN("fail to release savepoint", K(ret), K(sp_name)); } return ret; } int ObSqlTransControl::xa_rollback_all_changes(ObExecContext& exec_ctx) { int ret = OB_SUCCESS; ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); const ObPhysicalPlanCtx* phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; transaction::ObStmtParam stmt_param; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { ret = OB_ERR_UNEXPECTED; LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps)); } else if (!my_session->is_in_transaction() || !my_session->get_trans_desc().is_xa_local_trans()) { ret = OB_INVALID_ARGUMENT; LOG_WARN("called in wrong context", K(ret), K(my_session->get_trans_desc())); } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL( stmt_param.init(my_session->get_effective_tenant_id(), get_stmt_timeout_ts(*phy_plan_ctx), false))) { LOG_WARN("ObStmtParam init error", K(ret), K(my_session)); } else if (OB_FAIL(ps->xa_rollback_all_changes(my_session->get_trans_desc(), stmt_param))) { LOG_WARN("fail to rollback savepoint", K(ret), K(stmt_param), K(my_session->get_trans_desc())); } return ret; } int ObSqlTransControl::end_stmt(ObExecContext& exec_ctx, const bool is_rollback) { int ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); storage::ObPartitionService* ps = NULL; common::ObPartitionLeaderArray pla; bool skip_end_stmt = false; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { LOG_WARN("can't get partition service or my_session", K(executor_ctx), K(my_session), K(ps)); ret = OB_ERR_UNEXPECTED; } else if (my_session->is_fast_select() || my_session->is_standalone_stmt()) { skip_end_stmt = true; } else if (FALSE_IT(skip_end_stmt = my_session->reuse_cur_sql_no())) { } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (OB_FAIL(get_participants( executor_ctx->get_table_locations(), pla, my_session->get_is_in_retry_for_dup_tbl()))) { LOG_WARN("get participants failed", K(ret), K(executor_ctx->get_table_locations())); } else if (OB_FAIL(merge_stmt_partitions(exec_ctx, *my_session))) { LOG_WARN("fail to merge stmt partitions", K(ret)); } else if (skip_end_stmt) { // nesetd sql will not end_stmt(). LOG_DEBUG("skip end_stmt", K(ret), "cur_stmt", my_session->get_current_query_string(), "trans_result", my_session->get_trans_result()); } else if (my_session->is_nested_session()) { TransResult& trans_result = my_session->get_trans_result(); ObTransDesc& trans_desc = my_session->get_trans_desc(); ObPartitionArray cur_stmt_all_pgs; if (trans_result.get_total_partitions().count() > 0 && OB_FAIL(change_pkeys_to_pgs_( executor_ctx->get_table_locations(), trans_result.get_total_partitions(), cur_stmt_all_pgs))) { LOG_WARN("change cur stmt pkeys to pgs error", K(ret), K(trans_result)); trans_desc.set_need_rollback(); } else if (OB_FAIL(ps->end_nested_stmt(trans_desc, cur_stmt_all_pgs, is_rollback))) { LOG_WARN("end nested stmt fail", K(ret), K(trans_desc), K(cur_stmt_all_pgs), K(is_rollback)); } /* int64_t nested_count = my_session->get_nested_count(); const ObString nested_stmt = my_session->get_current_query_string(); LOG_INFO("end_nested_stmt", K(nested_count), K(nested_stmt), K(trans_result), K(is_rollback)); */ } else { transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); TransResult& trans_result = my_session->get_trans_result(); ObPartitionArray discard_partitions; if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2100 && !is_rollback) { // param discard_partitions is added in version 2.1 // in order to compatible with the old server below version 2.1 // only calculate the discard partition when all server are updated to version 2.1 OZ(get_discard_participants( trans_result.get_total_partitions(), trans_result.get_response_partitions(), discard_partitions), trans_result, discard_partitions); } ObPartitionArray cur_stmt_all_pgs; ObPartitionArray cur_stmt_discard_pgs; ObPartitionLeaderArray cur_stmt_pla; if (trans_result.get_total_partitions().count() > 0 && OB_FAIL(change_pkeys_to_pgs_( executor_ctx->get_table_locations(), trans_result.get_total_partitions(), cur_stmt_all_pgs))) { LOG_WARN("change cur stmt pkeys to pgs error", K(ret), K(trans_result), K(trans_desc)); trans_desc.set_need_rollback(); } else if (discard_partitions.count() > 0 && OB_FAIL(change_pkeys_to_pgs_( executor_ctx->get_table_locations(), discard_partitions, cur_stmt_discard_pgs))) { LOG_WARN("change cur stmt discard pkeys to pgs error", K(ret), K(trans_result), K(trans_desc)); trans_desc.set_need_rollback(); } else if (pla.count() > 0 && OB_FAIL(change_pla_info_(executor_ctx->get_table_locations(), pla, cur_stmt_pla))) { LOG_WARN("change pla info error", K(ret), K(pla), K(cur_stmt_pla), K(trans_result), K(trans_desc)); trans_desc.set_need_rollback(); } else { /* const ObString root_stmt = my_session->get_current_query_string(); LOG_INFO("end_root_stmt", K(root_stmt), KP(&trans_desc), "trans_desc_max_sql_no", trans_desc.get_max_sql_no(), "trans_result_max_sql_no", trans_result.get_max_sql_no()); */ OZ(ps->end_stmt(is_rollback, trans_result.is_incomplete(), cur_stmt_all_pgs, trans_result.get_part_epoch_list(), cur_stmt_discard_pgs, cur_stmt_pla, trans_desc), trans_result, discard_partitions, trans_desc); DEBUG_TRANS_STAGE("end_stmt"); } if (OB_FAIL(ret)) { LOG_WARN("call end_stmt", K(ret), K(trans_desc), "cur_stmt", my_session->get_current_query_string(), K(cur_stmt_all_pgs), K(cur_stmt_discard_pgs), K(cur_stmt_pla), K(discard_partitions), K(trans_result)); } } if (OB_NOT_NULL(my_session) && !skip_end_stmt && !my_session->is_nested_session()) { my_session->get_trans_result().clear_stmt_result(); int end_ret = my_session->set_end_stmt(); if (OB_SUCCESS != end_ret) { LOG_ERROR("failed to set end stmt", K(end_ret)); if (OB_SUCCESS == ret) { ret = end_ret; } } } return ret; } int ObSqlTransControl::start_participant( ObExecContext& exec_ctx, const common::ObPartitionArray& participants, bool is_remote /* = false*/) { int ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); storage::ObPartitionService* ps = NULL; const ObPhysicalPlan* phy_plan = NULL; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service()) || OB_ISNULL(phy_plan = GET_PHY_PLAN_CTX(exec_ctx)->get_phy_plan())) { LOG_WARN("can't get partition service or my_session", K(executor_ctx), K(my_session), K(ps)); ret = OB_ERR_UNEXPECTED; } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (my_session->is_fast_select() || my_session->is_standalone_stmt()) { // do nothing ... } else { transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); TransResult& trans_result = my_session->get_trans_result(); if (OB_FAIL(trans_result.merge_total_partitions(participants))) { LOG_WARN("fail to merge partitions", K(ret), K(participants)); } if (OB_SUCC(ret)) { ObPartitionArray pgs; ObPartitionEpochArray partition_epoch_arr; const ObTransID& trans_id = trans_desc.get_trans_id(); int64_t sql_no = trans_desc.get_sql_no(); bool is_forbidden = false; if (participants.count() > 0 && OB_FAIL(change_pkeys_to_pgs_(executor_ctx->get_table_locations(), participants, pgs))) { LOG_WARN("change pkeys to pgs error", K(ret), K(participants), K(pgs), K(trans_desc)); } else if (OB_FAIL(ps->is_trans_forbidden_sql_no(trans_id, pgs, sql_no, is_forbidden))) { LOG_WARN("get trans forbidden error", K(ret), K(participants), K(pgs), K(trans_desc)); } else if (is_forbidden) { ret = OB_ERR_INTERRUPTED; LOG_WARN("execution is interrupted", K(ret), K(participants), K(pgs), K(trans_desc)); } else if (OB_FAIL(ps->start_participant(trans_desc, pgs, partition_epoch_arr))) { if (OB_NOT_MASTER != ret) { LOG_WARN( "fail start participants", K(ret), K(is_remote), K(participants), K(pgs), K(trans_result), K(trans_desc)); } if (is_data_not_readable_err(ret) && !is_remote) { int add_ret = OB_SUCCESS; if (OB_UNLIKELY( OB_SUCCESS != (add_ret = my_session->get_retry_info_for_update().add_invalid_server_distinctly( exec_ctx.get_addr(), true)))) { LOG_WARN("fail to add local addr to invalid servers distinctly", K(ret), K(add_ret), K(exec_ctx.get_addr()), K(my_session->get_retry_info())); } } } else { DEBUG_TRANS_STAGE("start_participant"); LOG_DEBUG("start participant", K(participants), K(trans_result)); OZ(trans_result.merge_response_partitions(participants), participants); OZ(trans_result.merge_part_epoch_list(partition_epoch_arr), partition_epoch_arr); } NG_TRACE_EXT(start_part, OB_ID(ret), ret, OB_ID(trans_id), trans_desc.get_trans_id()); } } return ret; } int ObSqlTransControl::end_participant( ObExecContext& exec_ctx, const bool is_rollback, const common::ObPartitionArray& participants) { int ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); storage::ObPartitionService* ps = NULL; if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) { LOG_WARN("can't get partition service or my_session", K(executor_ctx), K(my_session), K(ps)); ret = OB_ERR_UNEXPECTED; } else if (OB_UNLIKELY(my_session->is_zombie())) { // session has been killed some moment ago ret = OB_ERR_SESSION_INTERRUPTED; LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()), K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid()); } else if (my_session->is_fast_select() || my_session->is_standalone_stmt()) { // do nothing ... } else { transaction::ObTransDesc& trans_desc = my_session->get_trans_desc(); ObPartitionArray pgs; if (participants.count() > 0 && OB_FAIL(change_pkeys_to_pgs_(executor_ctx->get_table_locations(), participants, pgs))) { LOG_WARN("change pkeys to pgs error", K(ret), K(participants), K(pgs), K(trans_desc)); } else if (OB_FAIL(ps->end_participant(is_rollback, trans_desc, pgs))) { LOG_WARN("fail end participant", K(ret), K(is_rollback), K(participants), K(pgs), K(trans_desc)); } else { // do nothing DEBUG_TRANS_STAGE("end_participant"); } } NG_TRACE_EXT(end_participant, OB_ID(ret), ret); return ret; } int ObSqlTransControl::get_pg_key_( const ObIArray& table_locations, const ObPartitionKey& pkey, ObPartitionKey& pg_key) { int ret = OB_SUCCESS; bool hit = false; for (int64_t i = 0; OB_SUCC(ret) && !hit && i < table_locations.count(); ++i) { const ObPartitionReplicaLocationIArray& part_locations = table_locations.at(i).get_partition_location_list(); for (int64_t j = 0; OB_SUCC(ret) && j < part_locations.count(); ++j) { ObPartitionKey tmp_pkey; if (OB_FAIL(part_locations.at(j).get_partition_key(tmp_pkey))) { LOG_WARN("get partition key error", K(ret), "location", part_locations.at(j)); } else if (pkey == tmp_pkey) { pg_key = part_locations.at(j).get_pg_key(); hit = true; break; } else { // do nothing } } } if (OB_SUCC(ret)) { if (!hit || !pg_key.is_valid()) { LOG_DEBUG("cannot find pg key", K(ret), K(table_locations), K(pkey), K(pg_key)); if (OB_FAIL(storage::ObPartitionService::get_instance().get_pg_key(pkey, pg_key))) { LOG_WARN("get pg key error", K(ret), K(pkey), K(pg_key)); } } } return ret; } int ObSqlTransControl::change_pkeys_to_pgs_( const ObIArray& table_locations, const ObPartitionArray& pkeys, ObPartitionArray& pg_keys) { int ret = OB_SUCCESS; if (pkeys.count() <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(pkeys), K(pg_keys), K(table_locations)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) { ObPGKey pg_key; if (table_locations.count() <= 0) { if (OB_FAIL(storage::ObPartitionService::get_instance().get_pg_key(pkeys.at(i), pg_key))) { LOG_WARN("get pg key error", K(ret), K(pkeys), K(pg_key)); } } else if (OB_FAIL(get_pg_key_(table_locations, pkeys.at(i), pg_key))) { LOG_WARN("get pg key error", K(ret), "key", pkeys.at(i)); } else { // do nothing } if (OB_SUCC(ret)) { int64_t j = 0; for (; j < pg_keys.count(); ++j) { if (pg_key == pg_keys.at(j)) { break; } } if (j == pg_keys.count()) { if (OB_FAIL(pg_keys.push_back(pg_key))) { LOG_WARN("pg keys push back error", K(ret), K(pg_key), K(pg_keys)); } } } } } return ret; } int ObSqlTransControl::change_pla_info_(const ObIArray& table_locations, const ObPartitionLeaderArray& pla, ObPartitionLeaderArray& out_pla) { int ret = OB_SUCCESS; if (pla.count() <= 0) { ret = OB_INVALID_ARGUMENT; LOG_WARN("invalid argument", K(ret), K(pla), K(table_locations)); } else { for (int64_t i = 0; OB_SUCC(ret) && i < pla.count(); ++i) { ObPGKey pg_key; if (table_locations.count() <= 0) { if (OB_FAIL(storage::ObPartitionService::get_instance().get_pg_key(pla.get_partitions().at(i), pg_key))) { LOG_WARN("get pg key error", K(ret), K(table_locations), K(pla)); } } else if (OB_FAIL(get_pg_key_(table_locations, pla.get_partitions().at(i), pg_key))) { LOG_WARN("get pg key error", K(ret), K(table_locations), K(pla)); } else { // do nothing } if (OB_SUCC(ret)) { const ObAddr& addr = pla.get_leaders().at(i); const common::ObPartitionType& type = pla.get_types().at(i); int64_t j = 0; for (; j < out_pla.count(); ++j) { if (pg_key == out_pla.get_partitions().at(j) && addr == out_pla.get_leaders().at(j) && type == out_pla.get_types().at(j)) { break; } } if (j == out_pla.count()) { if (OB_FAIL(out_pla.push(pg_key, addr, type))) { LOG_WARN("out pla push back error", K(pg_key), K(addr), K(type)); } } } } } return ret; } int ObSqlTransControl::get_participants( const ObIArray& table_locations, ObPartitionLeaderArray& pla, bool is_retry_for_dup_tbl) { int ret = OB_SUCCESS; uint64_t N = table_locations.count(); ObPartitionKey key; common::ObPartitionType type; // SQL_LOG(DEBUG, "phy table location size:", K(N)); for (uint64_t i = 0; OB_SUCC(ret) && i < N; ++i) { const ObPartitionReplicaLocationIArray& part_locations = table_locations.at(i).get_partition_location_list(); int64_t M = part_locations.count(); bool is_dup = table_locations.at(i).is_duplicate_table(); bool is_dup_not_dml = table_locations.at(i).is_duplicate_table_not_in_dml(); for (int64_t j = 0; OB_SUCC(ret) && j < M; ++j) { if (is_dup) { if (part_locations.at(j).get_replica_location().is_follower()) { type = ObPartitionType::DUPLICATE_FOLLOWER_PARTITION; } else if (is_dup_not_dml && false == is_retry_for_dup_tbl) { type = ObPartitionType::DUPLICATE_FOLLOWER_PARTITION; } else { type = ObPartitionType::DUPLICATE_LEADER_PARTITION; } } else { type = ObPartitionType::NORMAL_PARTITION; } // LOG_INFO("partition location", K(is_dup), K(type), K(is_dup_not_dml), K(part_locations.at(j))); if (OB_FAIL(part_locations.at(j).get_partition_key(key))) { LOG_WARN("failed to get partition key", K(ret)); } else if (OB_UNLIKELY(is_virtual_table(key.table_id_))) { } else if (OB_FAIL(append_participant_to_array_distinctly( pla, key, type, part_locations.at(j).get_replica_location().server_))) { /* * In the case of multiple alias table corresponding to the same physical table, the * key we get could be duplicate. To avoid having same partition as participants more * than once, we need to check if we have already added the key in the list. */ LOG_WARN("fail to push back participant", K(ret), K(i), K(key), K(is_dup)); } } /* for */ } /* for */ return ret; } int ObSqlTransControl::get_participants(ObExecContext& exec_ctx, ObPartitionLeaderArray& pla) { int ret = OB_SUCCESS; ObTaskExecutorCtx* task_exec_ctx = exec_ctx.get_task_executor_ctx(); ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx); if (OB_ISNULL(task_exec_ctx) || OB_ISNULL(my_session)) { ret = OB_NOT_INIT; LOG_WARN("task executor ctx should not be NULL", K(my_session)); } else if (OB_FAIL(get_participants( task_exec_ctx->get_table_locations(), pla, my_session->get_is_in_retry_for_dup_tbl()))) { LOG_WARN("get participants failed", K(ret)); } return ret; } int ObSqlTransControl::append_participant_to_array_distinctly( ObPartitionLeaderArray& pla, const ObPartitionKey& key, const common::ObPartitionType type, const ObAddr& svr) { int ret = OB_SUCCESS; bool found = false; // check if the key already exists for (int64_t i = 0; !found && i < pla.count(); i++) { if (pla.get_partitions().at(i) == key && pla.get_leaders().at(i) == svr) { found = true; } } // if not, add it if (!found) { ret = pla.push(key, svr, type); } return ret; } int ObSqlTransControl::get_participants(ObExecContext& exec_ctx, common::ObPartitionArray& participants) { // DO NOT reset participants for init, see ObResultSet::merge_stmt_partitions(). int ret = OB_SUCCESS; ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx(); const ObIArray& table_locations = task_exec_ctx.get_table_locations(); uint64_t N = table_locations.count(); SQL_LOG(DEBUG, "phy table location size:", K(N)); for (uint64_t i = 0; OB_SUCC(ret) && i < N; ++i) { const ObPartitionReplicaLocationIArray& part_locations = table_locations.at(i).get_partition_location_list(); int64_t M = part_locations.count(); for (int64_t j = 0; OB_SUCC(ret) && j < M; ++j) { // LOG_INFO("partition location", K(part_locations.at(j))); ObPartitionKey key; if (OB_FAIL(part_locations.at(j).get_partition_key(key))) { LOG_WARN("failed to get partition key", K(ret)); } else if (OB_UNLIKELY(is_virtual_table(key.table_id_))) { } else if (OB_FAIL(append_participant_to_array_distinctly(participants, key))) { /* * In the case of multiple alias table corresponding to the same physical table, the * key we get could be duplicate. To avoid having same partition as participants more * than once, we need to check if we have already added the key in the list. */ LOG_WARN("fail to push back participant", K(ret), K(i), K(j), K(key)); } } } return ret; } int ObSqlTransControl::get_root_job_participants( ObExecContext& exec_ctx, const ObPhyOperator& root_job_root_op, ObPartitionArray& participants) { int ret = OB_SUCCESS; ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx(); ObSEArray table_location_keys; OZ(ObTaskSpliter::find_all_table_location_keys(table_location_keys, root_job_root_op)); for (int64_t i = 0; OB_SUCC(ret) && i < table_location_keys.count(); ++i) { const TableLocationKey& table_location_key = table_location_keys.at(i); const ObPhyTableLocation* table_loc = NULL; OZ(ObTaskExecutorCtxUtil::get_phy_table_location( task_exec_ctx, table_location_key.table_id_, table_location_key.ref_table_id_, table_loc)); CK(OB_NOT_NULL(table_loc)); OZ(append_participant_by_table_loc(participants, *table_loc)); } return ret; } int ObSqlTransControl::get_root_job_participants( ObExecContext& exec_ctx, const ObOperator& root_job_root_op, ObPartitionArray& participants) { int ret = OB_SUCCESS; ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx(); ObSEArray table_location_keys; OZ(ObTaskSpliter::find_all_table_location_keys(table_location_keys, root_job_root_op.get_spec())); for (int64_t i = 0; OB_SUCC(ret) && i < table_location_keys.count(); ++i) { const TableLocationKey& table_location_key = table_location_keys.at(i); const ObPhyTableLocation* table_loc = NULL; OZ(ObTaskExecutorCtxUtil::get_phy_table_location( task_exec_ctx, table_location_key.table_id_, table_location_key.ref_table_id_, table_loc)); CK(OB_NOT_NULL(table_loc)); OZ(append_participant_by_table_loc(participants, *table_loc)); } return ret; } int ObSqlTransControl::append_participant_by_table_loc( ObPartitionIArray& participants, const ObPhyTableLocation& table_loc) { int ret = OB_SUCCESS; const ObPartitionReplicaLocationIArray& part_locations = table_loc.get_partition_location_list(); int64_t M = part_locations.count(); ObPartitionKey key; for (int64_t j = 0; OB_SUCC(ret) && j < M; ++j) { key.reset(); if (OB_FAIL(part_locations.at(j).get_partition_key(key))) { LOG_WARN("failed to get partition key", K(ret)); } else if (OB_UNLIKELY(true == is_virtual_table(key.table_id_))) { } else if (OB_FAIL(append_participant_to_array_distinctly(participants, key))) { /* * In the case of multiple alias table corresponding to the same physical table, the * key we get could be duplicate. To avoid having same partition as participants more * than once, we need to check if we have already added the key in the list. */ LOG_WARN("fail to push back participant", K(ret), K(j), K(key)); } } return ret; } int ObSqlTransControl::append_participant_to_array_distinctly( ObPartitionIArray& participants, const ObPartitionKey& key) { int ret = OB_SUCCESS; bool found = false; // check if the key already exists for (int64_t i = 0; !found && i < participants.count(); i++) { if (participants.at(i) == key) { found = true; } } // if not, add it if (!found) { ret = participants.push_back(key); } return ret; } int ObSqlTransControl::inc_session_ref(const ObSQLSessionInfo* my_session) { int ret = OB_SUCCESS; if (OB_ISNULL(GCTX.session_mgr_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("invalid parameter", K(my_session), K(GCTX.session_mgr_), K(ret)); } else { ret = GCTX.session_mgr_->inc_session_ref(my_session); } return ret; } int ObSqlTransControl::merge_stmt_partitions(ObExecContext& exec_ctx, ObSQLSessionInfo& session) { int ret = OB_SUCCESS; ObPartitionArray total_partitions; // ObSqlTransControl::get_participants() can merge partitions. if (OB_FAIL(ObSqlTransControl::get_participants(exec_ctx, total_partitions))) { LOG_WARN("get participants failed", K(ret)); } else if (OB_FAIL(session.get_trans_result().merge_total_partitions(total_partitions))) { LOG_WARN("merge total participants failed", K(ret)); } return ret; } int ObSqlTransControl::get_stmt_snapshot_info( ObExecContext& exec_ctx, const bool is_cursor, transaction::ObTransDesc& trans_desc, ObTransSnapInfo& snap_info) { int ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; transaction::ObTransService* txs = NULL; CK(OB_NOT_NULL(executor_ctx)); CK(OB_NOT_NULL(ps = executor_ctx->get_partition_service())); CK(OB_NOT_NULL(txs = ps->get_trans_service())); OZ(txs->get_stmt_snapshot_info(is_cursor, trans_desc, snap_info)); return ret; } int ObSqlTransControl::start_standalone_stmt( ObExecContext& exec_ctx, ObSQLSessionInfo& session_info, ObPhysicalPlanCtx& phy_plan_ctx, bool is_remote) { int ret = OB_SUCCESS; LOG_DEBUG("start_standalone_stmt", "session_id", session_info.get_sessid()); ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); storage::ObPartitionService* ps = NULL; transaction::ObTransService* txs = NULL; bool is_local_single_partition_stmt = false; const ObPhysicalPlan* phy_plan = phy_plan_ctx.get_phy_plan(); transaction::ObStandaloneStmtDesc& standalone_stmt_desc = session_info.get_trans_desc().get_standalone_stmt_desc(); int32_t consistency_level = ObTransConsistencyLevel::UNKNOWN; int32_t consistency_type = ObTransConsistencyType::UNKNOWN; int32_t read_snapshot_type = ObTransReadSnapshotType::UNKNOWN; int64_t auto_spec_snapshot_version = ObTransVersion::INVALID_TRANS_VERSION; CK(OB_NOT_NULL(executor_ctx)); CK(OB_NOT_NULL(ps = executor_ctx->get_partition_service())); CK(OB_NOT_NULL(txs = ps->get_trans_service())); CK(OB_NOT_NULL(phy_plan)); OZ(decide_trans_read_interface_specs("ObSqlTransControl::start_standalone_stmt", session_info, phy_plan_ctx.get_phy_plan()->get_stmt_type(), phy_plan_ctx.get_phy_plan()->get_literal_stmt_type(), phy_plan_ctx.get_phy_plan()->has_for_update(), phy_plan_ctx.get_phy_plan()->is_contain_inner_table(), phy_plan_ctx.get_consistency_level(), phy_plan_ctx.get_phy_plan()->need_consistent_snapshot(), consistency_level, consistency_type, read_snapshot_type)); if (GCTX.is_standby_cluster()) { OZ(specify_stmt_snapshot_version_for_slave_cluster_sql_(session_info, phy_plan_ctx.get_phy_plan()->get_literal_stmt_type(), phy_plan_ctx.get_phy_plan()->is_contain_inner_table(), consistency_type, read_snapshot_type, auto_spec_snapshot_version)); } if (OB_SUCC(ret) && session_info.has_valid_read_snapshot_version()) { auto_spec_snapshot_version = session_info.get_read_snapshot_version(); } if (OB_SUCC(ret)) { common::ObPartitionLeaderArray in_pla; if (!(phy_plan->has_nested_sql() || OB_PHY_PLAN_UNCERTAIN == phy_plan->get_location_type())) { OZ(ObSqlTransControl::get_participants(exec_ctx, in_pla)); if (OB_SUCC(ret)) { is_local_single_partition_stmt = (in_pla.count() == 1 && in_pla.get_leaders().at(0) == MYADDR && read_snapshot_type == ObTransReadSnapshotType::STATEMENT_SNAPSHOT); } } OZ(standalone_stmt_desc.init(txs->get_server(), session_info.get_effective_tenant_id(), get_stmt_timeout_ts(phy_plan_ctx), session_info.get_trx_lock_timeout(), is_local_single_partition_stmt, consistency_type, read_snapshot_type, is_local_single_partition_stmt ? in_pla.get_partitions().at(0) : ObPartitionKey())); } if (OB_FAIL(ret)) { } else if (standalone_stmt_desc.is_bounded_staleness_read() && !ObWeakReadUtil::check_weak_read_service_available()) { if (ObSqlTransUtil::plan_can_start_trans(session_info.get_local_autocommit(), session_info.get_in_transaction())) { OZ(implicit_start_trans(exec_ctx, is_remote)); } } else { OZ(txs->get_stmt_snapshot_info(session_info.get_trans_desc(), auto_spec_snapshot_version)); if (standalone_stmt_desc.is_bounded_staleness_read()) { OZ(update_safe_weak_read_snapshot(standalone_stmt_desc.is_bounded_staleness_read(), session_info, read_snapshot_type, standalone_stmt_desc.get_snapshot_version())); } } return ret; } bool ObSqlTransControl::is_isolation_RR_or_SE(int32_t isolation) { return (isolation == ObTransIsolation::REPEATABLE_READ || isolation == ObTransIsolation::SERIALIZABLE); } int ObSqlTransControl::start_cursor_stmt(ObExecContext& exec_ctx) { int ret = OB_SUCCESS; ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx); storage::ObPartitionService* ps = nullptr; transaction::ObTransService* txs = nullptr; ObSQLSessionInfo* session = nullptr; ObPhysicalPlanCtx* plan_ctx = nullptr; CK(OB_NOT_NULL(executor_ctx)); CK(OB_NOT_NULL(ps = executor_ctx->get_partition_service())); CK(OB_NOT_NULL(txs = ps->get_trans_service())); CK(OB_NOT_NULL(session = exec_ctx.get_my_session())); CK(OB_NOT_NULL(plan_ctx = GET_PHY_PLAN_CTX(exec_ctx))); OZ(txs->start_cursor_stmt(session->get_trans_desc(), plan_ctx->get_trans_timeout_timestamp())); return ret; } } // namespace sql } // namespace oceanbase