Files
oceanbase/src/sql/ob_sql_trans_control.cpp

1902 lines
80 KiB
C++

/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#define USING_LOG_PREFIX SQL_EXE
#include "share/ob_schema_status_proxy.h" // ObSchemaStatusProxy
#include "share/ob_encryption_util.h"
#include "sql/ob_sql_trans_control.h"
#include "sql/ob_sql_trans_hook.h"
#include "sql/engine/ob_physical_plan.h"
#include "sql/engine/ob_physical_plan_ctx.h"
#include "sql/parser/parse_malloc.h"
#include "sql/resolver/ob_stmt.h"
#include "sql/session/ob_sql_session_info.h"
#include "sql/ob_sql_trans_util.h"
#include "sql/ob_end_trans_callback.h"
#include "lib/oblog/ob_trace_log.h"
#include "storage/ob_partition_service.h"
#include "storage/transaction/ob_trans_define.h"
#include "common/ob_partition_key.h"
#include "sql/engine/ob_exec_context.h"
#include "sql/executor/ob_task_spliter.h"
#include "lib/profile/ob_perf_event.h"
#include "observer/ob_server_struct.h"
#include "observer/ob_server.h"
#include "storage/transaction/ob_weak_read_util.h" //ObWeakReadUtil
#if 0
#define DEBUG_AC_TRANS(exec_ctx) \
{ \
ObSQLSessionInfo* __my_session = GET_MY_SESSION(exec_ctx); \
OB_ASSERT(NULL != __my_session); \
bool ac = __my_session->get_autocommit(); \
bool in_trans = __my_session->get_in_transaction(); \
LOG_INFO("ac & in_trans", K(ac), K(in_trans)); \
}
#else
#define DEBUG_AC_TRANS(exec_ctx)
#endif
#if 0
#define DEBUG_TRANS_STAGE(stage) \
if (my_session->get_has_temp_table_flag()) { \
LOG_WARN("TMP_TABLE " stage, K(my_session->get_trans_desc())); \
}
#else
#define DEBUG_TRANS_STAGE(stage)
#endif
namespace oceanbase {
using namespace common;
using namespace transaction;
using namespace share;
using namespace share::schema;
namespace sql {
int ObSqlTransControl::on_plan_start(ObExecContext& exec_ctx, bool is_remote /* = false*/)
{
DEBUG_AC_TRANS(exec_ctx);
int ret = OB_SUCCESS;
bool ac = true;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
if (OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) || OB_ISNULL(plan_ctx->get_phy_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr or plan ctx or plan_ctx->get_phy_plan is null", K(ret), K(my_session), K(plan_ctx));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(my_session->get_autocommit(ac))) {
LOG_WARN("fail to get autocommit", K(ret));
} else if (stmt::T_SELECT == plan_ctx->get_phy_plan()->get_stmt_type() &&
!plan_ctx->get_phy_plan()->has_for_update() &&
(!my_session->get_trans_desc().is_valid() || my_session->get_trans_desc().is_trans_end()) &&
my_session->is_support_external_consistent() && GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2260 &&
!(my_session->is_in_transaction() && my_session->get_trans_desc().is_valid() &&
my_session->get_trans_desc().is_xa_local_trans()) &&
(share::is_oracle_mode() || my_session->get_local_autocommit())) {
OZ(start_standalone_stmt(exec_ctx, *my_session, *plan_ctx, is_remote));
} else {
bool in_trans = my_session->get_in_transaction();
if (ObSqlTransUtil::plan_can_start_trans(ac, in_trans)) {
ret = implicit_start_trans(exec_ctx, is_remote);
}
}
return ret;
}
/*int ObSqlTransControl::on_plan_end(ObExecContext &exec_ctx,
bool is_rollback,
ObExclusiveEndTransCallback &callback)
{
DEBUG_AC_TRANS(exec_ctx);
int ret = OB_SUCCESS;
ObSQLSessionInfo *my_session = GET_MY_SESSION(exec_ctx);
bool ac = true;
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr is null", K(ret));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
//session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed", K(ret), K(my_session->get_session_state()),
K(my_session->get_sessid()), "proxy_sessid", my_session->get_proxy_sessid());
} else if (OB_FAIL(my_session->get_autocommit(ac))) {
LOG_WARN("fail to get autocommit", K(ret));
} else {
bool in_trans = my_session->get_in_transaction();
if (ObSqlTransUtil::plan_can_end_trans(ac, in_trans)) {
ret = implicit_end_trans(exec_ctx, is_rollback, callback);
}
}
return ret;
}*/
int ObSqlTransControl::explicit_start_trans(ObExecContext& ctx, const bool read_only)
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(ctx);
ObTaskExecutorCtx& task_exec_ctx = ctx.get_task_exec_ctx();
if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session) || OB_UNLIKELY(!task_exec_ctx.min_cluster_version_is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("invalid param", K(ret), K(plan_ctx), K(my_session), K(task_exec_ctx.get_min_cluster_version()));
} else {
int32_t access_mode = read_only ? ObTransAccessMode::READ_ONLY : ObTransAccessMode::READ_WRITE;
int32_t tx_isolation = my_session->get_tx_isolation();
ObStartTransParam& start_trans_param = plan_ctx->get_start_trans_param();
start_trans_param.set_access_mode(access_mode);
start_trans_param.set_isolation(tx_isolation);
start_trans_param.set_type(my_session->get_trans_type());
start_trans_param.set_cluster_version(task_exec_ctx.get_min_cluster_version());
start_trans_param.set_inner_trans(my_session->is_inner());
NG_TRACE_EXT(start_trans, OB_ID(read_only), read_only, OB_ID(access_mode), access_mode);
if (OB_FAIL(ObSqlTransControl::explicit_start_trans(ctx, start_trans_param))) {
LOG_WARN("fail start trans", K(ret));
}
}
return ret;
}
int ObSqlTransControl::explicit_start_trans(ObExecContext& exec_ctx, transaction::ObStartTransParam& start_trans_param)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr is null", K(ret));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else {
if (false == my_session->get_in_transaction()) {
if (OB_FAIL(start_trans(exec_ctx, start_trans_param))) {
LOG_WARN("fail start explicit trans", K(ret));
} else {
bool is_read_only =
(start_trans_param.get_access_mode() == transaction::ObTransAccessMode::READ_ONLY) ? true : false;
my_session->set_tx_read_only(is_read_only);
my_session->set_in_transaction(true);
}
}
}
return ret;
}
int ObSqlTransControl::implicit_start_trans(ObExecContext& exec_ctx, bool is_remote /* = false*/)
{
int ret = OB_SUCCESS;
ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx();
bool ac = true;
if (OB_ISNULL(plan_ctx) || OB_ISNULL(my_session) || OB_ISNULL(plan_ctx->get_phy_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("can't get plan_ctx or session or phy_plan", K(ret), K(plan_ctx), K(my_session));
} else if (OB_UNLIKELY(!is_remote && !task_exec_ctx.min_cluster_version_is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("is not remote start trans, but task_exec_ctx.get_min_cluster_version() is invalid",
K(ret),
K(task_exec_ctx.get_min_cluster_version()));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(my_session->get_autocommit(ac))) {
LOG_WARN("fail to get autocommit", K(ret));
} else {
bool in_trans = my_session->get_in_transaction();
bool tx_read_only = my_session->get_tx_read_only();
int32_t tx_isolation = my_session->get_tx_isolation();
transaction::ObStartTransParam& start_trans_param = plan_ctx->get_start_trans_param();
if (true == ac && false == in_trans && stmt::T_SELECT == plan_ctx->get_phy_plan()->get_stmt_type() &&
!plan_ctx->get_phy_plan()->has_for_update()) {
start_trans_param.set_access_mode(transaction::ObTransAccessMode::READ_ONLY);
} else if (true == tx_read_only) {
start_trans_param.set_access_mode(transaction::ObTransAccessMode::READ_ONLY);
} else {
start_trans_param.set_access_mode(transaction::ObTransAccessMode::READ_WRITE);
}
start_trans_param.set_type(my_session->get_trans_type());
start_trans_param.set_isolation(tx_isolation);
start_trans_param.set_autocommit(ac && !in_trans);
if (task_exec_ctx.min_cluster_version_is_valid()) {
start_trans_param.set_cluster_version(task_exec_ctx.get_min_cluster_version());
} else {
start_trans_param.set_cluster_version(GET_MIN_CLUSTER_VERSION());
}
start_trans_param.set_inner_trans(my_session->is_inner());
if (OB_FAIL(start_trans(exec_ctx, start_trans_param, is_remote))) {
LOG_WARN("fail start trans", K(ret));
}
if (OB_SUCC(ret) && false == ac) {
my_session->set_in_transaction(true);
}
}
return ret;
}
int ObSqlTransControl::start_trans(
ObExecContext& exec_ctx, transaction::ObStartTransParam& start_trans_param, bool is_remote /* = false*/)
{
DEBUG_AC_TRANS(exec_ctx);
int ret = OB_SUCCESS;
int64_t org_cluster_id = OB_INVALID_ORG_CLUSTER_ID;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
ObPhysicalPlanCtx* plan_ctx = exec_ctx.get_physical_plan_ctx();
storage::ObPartitionService* ps = NULL;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) ||
OB_ISNULL(ps = executor_ctx->get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(plan_ctx), K(ps));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(my_session->get_ob_org_cluster_id(org_cluster_id))) {
if (OB_LIKELY(is_remote && OB_ENTRY_NOT_EXIST == ret)) {
LOG_WARN("start trans in remote, ob_org_cluster_id is not serilized to the remote server, "
"maybe server version is different, then ignore it, "
"use default cluster id and set ret to OB_SUCCESS");
ret = OB_SUCCESS;
org_cluster_id = ObServerConfig::get_instance().cluster_id;
} else {
LOG_WARN("fail to get ob_org_cluster_id", K(ret));
}
}
if (OB_FAIL(ret)) {
} else {
if (OB_INVALID_ORG_CLUSTER_ID == org_cluster_id || OB_INVALID_CLUSTER_ID == org_cluster_id) {
org_cluster_id = ObServerConfig::get_instance().cluster_id;
if (OB_UNLIKELY(org_cluster_id < OB_MIN_CLUSTER_ID || org_cluster_id > OB_MAX_CLUSTER_ID)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("org_cluster_id is set to cluster_id, but it is out of range",
K(ret),
K(org_cluster_id),
K(OB_MIN_CLUSTER_ID),
K(OB_MAX_CLUSTER_ID));
}
}
if (OB_SUCC(ret)) {
my_session->get_trans_result().reset();
transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
const uint64_t tenant_id = my_session->get_effective_tenant_id();
int64_t trans_timeout_ts = 0;
if (OB_FAIL(get_trans_timeout_ts(*my_session, trans_timeout_ts))) {
LOG_ERROR("fail to get trans timeout ts", K(ret));
} else if (OB_FAIL(ps->start_trans(tenant_id,
static_cast<uint64_t>(org_cluster_id),
start_trans_param,
trans_timeout_ts,
my_session->get_sessid(),
my_session->get_proxy_sessid(),
trans_desc))) {
LOG_WARN("fail start trans", K(ret));
}
DEBUG_TRANS_STAGE("start_trans");
NG_TRACE_EXT(start_trans,
OB_ID(trans_id),
trans_desc.get_trans_id(),
OB_ID(timeout),
trans_timeout_ts,
OB_ID(start_time),
my_session->get_query_start_time());
}
}
return ret;
}
int ObSqlTransControl::implicit_end_trans(
ObExecContext& exec_ctx, const bool is_rollback, ObExclusiveEndTransCallback& callback)
{
int ret = OB_SUCCESS;
const bool is_explicit = false;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr is null, won't call end trans!!!", K(ret));
} else if (OB_FAIL(end_trans(exec_ctx, is_rollback, is_explicit, callback))) {
LOG_WARN("fail to end trans", K(ret), K(is_rollback));
}
return ret;
}
int ObSqlTransControl::explicit_end_trans(ObExecContext& exec_ctx, const bool is_rollback)
{
int ret = OB_SUCCESS;
if (exec_ctx.is_end_trans_async()) {
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr is null", K(ret));
} else {
ObEndTransAsyncCallback& call_back = my_session->get_end_trans_cb();
ret = explicit_end_trans(exec_ctx, is_rollback, call_back);
}
} else {
ObEndTransSyncCallback call_back;
ret = explicit_end_trans(exec_ctx, is_rollback, call_back);
}
return ret;
}
int ObSqlTransControl::explicit_end_trans(
ObExecContext& exec_ctx, const bool is_rollback, ObEndTransSyncCallback& callback)
{
int ret = OB_SUCCESS;
const bool is_explicit = true;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session ptr is null", K(ret));
} else {
if (true == my_session->get_in_transaction()) {
if (OB_FAIL(callback.init(&(my_session->get_trans_desc()), my_session))) {
LOG_ERROR("fail init callback", K(ret), K(my_session->get_trans_desc()));
// implicit commit, no rollback
} else {
int wait_ret = OB_SUCCESS;
if (OB_FAIL(end_trans(exec_ctx, is_rollback, is_explicit, callback))) {
LOG_WARN("fail end explicit trans", K(is_rollback), K(ret));
}
if (OB_UNLIKELY(OB_SUCCESS != (wait_ret = callback.wait()))) {
LOG_WARN("sync end trans callback return an error!",
K(ret),
K(wait_ret),
K(is_rollback),
K(my_session->get_trans_desc()));
}
ret = OB_SUCCESS != ret ? ret : wait_ret;
}
} else {
my_session->reset_tx_variable();
my_session->set_early_lock_release(false);
exec_ctx.set_need_disconnect(false);
my_session->get_trans_desc().get_standalone_stmt_desc().reset();
}
}
return ret;
}
int ObSqlTransControl::explicit_end_trans(
ObExecContext& exec_ctx, const bool is_rollback, ObEndTransAsyncCallback& callback)
{
int ret = OB_SUCCESS;
const bool is_explicit = true;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
if (OB_ISNULL(my_session)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session is NULL, won't call end trans!!!", K(ret), KP(my_session));
} else {
if (true == my_session->get_in_transaction()) {
// other fields will be and should be (re)set correctly in call back routine.
if (OB_FAIL(end_trans(exec_ctx, is_rollback, is_explicit, callback))) {
LOG_WARN("fail end explicit trans", K(is_rollback), K(ret));
}
exec_ctx.get_trans_state().set_end_trans_executed(OB_SUCC(ret));
} else {
my_session->reset_first_stmt_type();
my_session->reset_tx_variable();
my_session->set_early_lock_release(false);
exec_ctx.set_need_disconnect(false);
my_session->get_trans_desc().get_standalone_stmt_desc().reset();
}
}
return ret;
}
int ObSqlTransControl::kill_query_session(
storage::ObPartitionService* ps, ObSQLSessionInfo& session, const ObSQLSessionState& status)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(ps)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument for point", K(ret), K(ps), K(session));
} else if (true == session.get_in_transaction()) {
const transaction::ObTransDesc& trans_desc = session.get_trans_desc();
if (OB_FAIL(ps->kill_query_session(trans_desc, status))) {
LOG_WARN("fail kill query or session error", K(ret), K(trans_desc));
}
}
return ret;
}
// may concurrency with user-query, need blocking query:
// - acquire session.query_lock by caller
// - only rollback transaction in Transaction-Layer, don't touch SQL-Layer state
int ObSqlTransControl::kill_active_trx(storage::ObPartitionService* ps, ObSQLSessionInfo* session)
{
LOG_DEBUG("kill active trx start", "session_id", session->get_sessid(), K(*session));
int ret = OB_SUCCESS;
transaction::ObTransDesc& trans_desc = session->get_trans_desc();
if (trans_desc.is_valid() && !trans_desc.is_trans_end() && !trans_desc.is_trx_idle_timeout()) {
if (OB_FAIL(ps->internal_kill_trans(trans_desc))) {
LOG_WARN("kill active trx by end_trans fail.",
K(ret),
"session_id",
session->get_sessid(),
K(*session),
K(trans_desc));
} else {
LOG_INFO("kill active trx succeed, need user rollback in next stmt",
"session_id",
session->get_sessid(),
K(*session),
K(trans_desc));
}
}
return ret;
}
int ObSqlTransControl::end_trans(
storage::ObPartitionService* ps, ObSQLSessionInfo* session, bool& has_called_txs_end_trans)
{
int ret = OB_SUCCESS;
int wait_ret = OB_SUCCESS;
int hook_ret = OB_SUCCESS;
has_called_txs_end_trans = false;
if (OB_ISNULL(ps) || OB_ISNULL(session)) {
ret = OB_INVALID_ARGUMENT;
LOG_ERROR("invalid argument for point", K(ret), K(ps), K(session));
} else if (true == session->get_in_transaction()) {
transaction::ObTransDesc& trans_desc = session->get_trans_desc();
int64_t timeout = 0;
ObEndTransSyncCallback callback;
if (OB_FAIL(get_stmt_timeout_ts(*session, timeout))) {
LOG_ERROR("fail to get stmt timeout ts", K(ret));
} else if (OB_FAIL(callback.init(&trans_desc, session))) {
LOG_ERROR("fail init callback", K(ret), K(trans_desc));
} else {
callback.set_is_need_rollback(true);
callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_IMPLICIT);
callback.handout();
if (OB_FAIL(ps->end_trans(true, trans_desc, callback, timeout))) {
LOG_WARN("fail end trans when session terminate", K(ret), K(trans_desc), K(timeout));
}
session->reset_first_stmt_type();
if (OB_UNLIKELY(OB_SUCCESS != (wait_ret = callback.wait()))) {
LOG_WARN("sync end trans callback return an error!", K(ret), K(wait_ret), K(trans_desc), K(timeout));
}
ret = OB_SUCCESS != ret ? ret : wait_ret;
if (callback.is_txs_end_trans_called()) {
has_called_txs_end_trans = true;
} else {
has_called_txs_end_trans = false;
LOG_WARN("fail before trans service end trans, may disconnct", K(ret), K(session->get_sessid()));
if (OB_UNLIKELY(OB_SUCCESS == ret)) {
LOG_ERROR("callback before trans service end trans, but ret is OB_SUCCESS, it is BUG!!!",
K(callback.is_txs_end_trans_called()));
}
}
session->reset_tx_variable();
session->set_early_lock_release(false);
}
} else {
session->reset_first_stmt_type();
has_called_txs_end_trans = true;
}
return (OB_SUCCESS != ret) ? ret : hook_ret;
}
int ObSqlTransControl::end_trans(
ObExecContext& exec_ctx, const bool is_rollback, const bool is_explicit, ObExclusiveEndTransCallback& callback)
{
DEBUG_AC_TRANS(exec_ctx);
int ret = OB_SUCCESS;
int hook_ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObPhysicalPlanCtx* plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
ObEndTransCallbackType callback_type = callback.get_callback_type();
storage::ObPartitionService* ps = NULL;
int32_t xa_trans_state = transaction::ObXATransState::NON_EXISTING;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(plan_ctx) ||
OB_ISNULL(ps = executor_ctx->get_partition_service())) {
LOG_ERROR("null ptr, won't call end trans!!!", K(executor_ctx), K(my_session), K(plan_ctx), K(ps));
ret = OB_ERR_UNEXPECTED;
} else {
transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
if (trans_desc.is_xa_local_trans() && !is_explicit) {
if (OB_FAIL(ps->get_xa_trans_state(xa_trans_state, trans_desc))) {
LOG_WARN("failed get_xa_trans_state", K(ret));
} else if (transaction::ObXATransState::NON_EXISTING != xa_trans_state) {
ret = OB_TRANS_XA_RMFAIL;
// LOG_USER_ERROR(OB_TRANS_XA_RMFAIL, ObXATransState::to_string(xa_trans_state));
LOG_WARN("invalid xa state", K(ret), K(xa_trans_state));
callback.set_is_need_rollback(is_rollback);
callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_IMPLICIT);
callback.handout();
callback.callback(ret);
}
} else {
int64_t timeout = get_stmt_timeout_ts(*plan_ctx);
callback.set_is_need_rollback(is_rollback);
if (is_explicit) {
callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_EXPLICIT);
} else {
callback.set_end_trans_type(ObExclusiveEndTransCallback::END_TRANS_TYPE_IMPLICIT);
}
callback.handout();
if (ASYNC_CALLBACK_TYPE == callback_type && OB_FAIL(inc_session_ref(my_session))) {
LOG_WARN("fail to inc session ref", K(ret));
} else if (OB_FAIL(ps->end_trans(is_rollback, trans_desc, callback, timeout))) {
LOG_WARN("fail end trans", K(ret), K(callback_type));
}
DEBUG_TRANS_STAGE("end_trans");
my_session->reset_first_stmt_type();
if (callback.get_callback_type() != ASYNC_CALLBACK_TYPE) {
my_session->reset_tx_variable();
my_session->set_early_lock_release(false);
}
// my_session->get_trans_desc().reset();
DEBUG_AC_TRANS(exec_ctx);
if (ASYNC_CALLBACK_TYPE == callback_type) {
exec_ctx.set_need_disconnect(false);
} else if (!callback.is_txs_end_trans_called()) {
exec_ctx.set_need_disconnect(true);
LOG_WARN("fail before trans service end trans, disconnct", K(ret), K(my_session->get_sessid()));
if (OB_UNLIKELY(OB_SUCCESS == ret)) {
LOG_ERROR("callback before trans service end trans, but ret is OB_SUCCESS, it is BUG!!!",
K(is_explicit),
K(callback.is_txs_end_trans_called()));
}
} else {
bool is_need_disconnect = false;
ObSQLUtils::check_if_need_disconnect_after_end_trans(ret, is_rollback, is_explicit, is_need_disconnect);
exec_ctx.set_need_disconnect(is_need_disconnect);
}
}
}
return (OB_SUCCESS != ret) ? ret : hook_ret;
}
bool ObSqlTransControl::is_weak_consistency_level(const ObConsistencyLevel& consistency_level)
{
return (WEAK == consistency_level || FROZEN == consistency_level);
}
int ObSqlTransControl::decide_trans_read_interface_specs(const char* module, const ObSQLSessionInfo& session,
const stmt::StmtType& stmt_type, const stmt::StmtType& literal_stmt_type, const bool& is_contain_select_for_update,
const bool& is_contain_inner_table, const ObConsistencyLevel& sql_consistency_level,
const bool need_consistent_snapshot, int32_t& trans_consistency_level, int32_t& trans_consistency_type,
int32_t& read_snapshot_type)
{
int ret = OB_SUCCESS;
const char* err_msg = "";
ObConsistencyLevelAdaptor consistency_level_adaptor(sql_consistency_level);
const bool is_standby_cluster = GCTX.is_standby_cluster();
const bool is_inner_sql = session.is_inner();
const bool is_from_show_stmt = (literal_stmt_type != stmt::T_NONE);
const bool is_build_index_stmt = (stmt::T_BUILD_INDEX_SSTABLE == stmt_type);
uint64_t tenant_id = session.get_effective_tenant_id();
const bool stmt_spec_snapshot_version_is_valid = session.has_valid_read_snapshot_version();
int32_t isolation = session.is_in_transaction() ? session.get_trans_desc().get_trans_param().get_isolation()
: session.get_tx_isolation();
const bool is_serializable_trans =
(ObTransIsolation::SERIALIZABLE == isolation || ObTransIsolation::REPEATABLE_READ == isolation);
trans_consistency_level = (int32_t)consistency_level_adaptor.get_consistency();
trans_consistency_type = ObTransConsistencyType::CURRENT_READ;
read_snapshot_type = ObTransReadSnapshotType::STATEMENT_SNAPSHOT;
if (OB_UNLIKELY(!ObTransConsistencyLevel::is_valid(trans_consistency_level))) {
ret = OB_ERR_UNEXPECTED;
err_msg = "invalid trans consisntency level";
} else if (is_build_index_stmt) {
trans_consistency_level = ObTransConsistencyLevel::WEAK;
trans_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ;
} else if (is_from_show_stmt) {
if (!is_standby_cluster || OB_SYS_TENANT_ID == tenant_id) {
trans_consistency_level = ObTransConsistencyLevel::STRONG;
trans_consistency_type = ObTransConsistencyType::CURRENT_READ;
} else {
trans_consistency_level = ObTransConsistencyLevel::WEAK;
trans_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ;
}
} else if (stmt::T_SELECT != stmt_type || (stmt::T_SELECT == stmt_type && is_contain_select_for_update)) {
trans_consistency_level = ObTransConsistencyLevel::STRONG;
trans_consistency_type = ObTransConsistencyType::CURRENT_READ;
} else if (is_contain_inner_table) {
if (is_inner_sql) {
} else {
if (!is_standby_cluster || OB_SYS_TENANT_ID == tenant_id) {
trans_consistency_level = ObTransConsistencyLevel::STRONG;
} else {
}
}
if (ObTransConsistencyLevel::STRONG == trans_consistency_level) {
trans_consistency_type = ObTransConsistencyType::CURRENT_READ;
} else {
trans_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ;
}
} else {
int32_t cur_stmt_consistency_type = ObTransConsistencyType::UNKNOWN;
const int32_t first_stmt_consistency_type = session.get_trans_consistency_type();
if (ObTransConsistencyLevel::STRONG == trans_consistency_level) {
cur_stmt_consistency_type = ObTransConsistencyType::CURRENT_READ;
} else {
cur_stmt_consistency_type = ObTransConsistencyType::BOUNDED_STALENESS_READ;
}
if (ObTransConsistencyType::UNKNOWN == first_stmt_consistency_type) {
trans_consistency_type = cur_stmt_consistency_type;
} else {
trans_consistency_type = first_stmt_consistency_type;
if (OB_UNLIKELY(cur_stmt_consistency_type != first_stmt_consistency_type)) {
err_msg = "current statement consistency type is different from the first statement";
}
}
}
if (OB_SUCCESS == ret) {
if (is_serializable_trans) {
read_snapshot_type = ObTransReadSnapshotType::TRANSACTION_SNAPSHOT;
} else {
if (is_contain_inner_table || !need_consistent_snapshot) {
read_snapshot_type = ObTransReadSnapshotType::PARTICIPANT_SNAPSHOT;
} else {
read_snapshot_type = ObTransReadSnapshotType::STATEMENT_SNAPSHOT;
}
}
}
if (OB_FAIL(ret) || OB_UNLIKELY(0 != STRCMP(err_msg, ""))) {
LOG_WARN(err_msg,
K(ret),
K(tenant_id),
K(module),
K(trans_consistency_type),
K(read_snapshot_type),
K(trans_consistency_level),
K(sql_consistency_level),
K(is_standby_cluster),
K(is_inner_sql),
K(is_from_show_stmt),
K(is_build_index_stmt),
K(is_serializable_trans),
K(is_contain_inner_table),
K(stmt_spec_snapshot_version_is_valid),
K(is_contain_select_for_update),
K(stmt_type),
K(literal_stmt_type),
K(session),
"trans_desc",
session.get_trans_desc());
} else {
LOG_DEBUG("decide_trans_read_interface_specs",
K(ret),
K(tenant_id),
K(module),
K(trans_consistency_type),
K(read_snapshot_type),
K(trans_consistency_level),
K(sql_consistency_level),
K(is_standby_cluster),
K(is_inner_sql),
K(is_from_show_stmt),
K(is_build_index_stmt),
K(is_serializable_trans),
K(is_contain_inner_table),
K(stmt_spec_snapshot_version_is_valid),
K(is_contain_select_for_update),
K(stmt_type),
K(literal_stmt_type),
K(session),
"trans_desc",
session.get_trans_desc());
}
return ret;
}
bool ObSqlTransControl::check_fast_select_read_uncommited(
transaction::ObTransDesc& trans_desc, const common::ObPartitionLeaderArray& pla)
{
bool read_uncommited = false;
const common::ObPartitionArray& participants = trans_desc.get_participants();
for (int i = 0; !read_uncommited && i < pla.get_partitions().count(); ++i) {
const ObPartitionKey& pkey = pla.get_partitions().at(i);
for (int j = 0; j < participants.count(); ++j) {
if (pkey == participants.at(j)) {
read_uncommited = true;
break;
}
}
}
return read_uncommited;
}
int ObSqlTransControl::update_safe_weak_read_snapshot(
bool is_bounded_staleness_read, ObSQLSessionInfo& session_info, int snapshot_type, int64_t snapshot_version)
{
int ret = OB_SUCCESS;
if (is_bounded_staleness_read) {
if (ObWeakReadUtil::enable_monotonic_weak_read(session_info.get_effective_tenant_id()) &&
!session_info.has_valid_read_snapshot_version() &&
ObTransReadSnapshotType::STATEMENT_SNAPSHOT == snapshot_type) {
int64_t cur_stmt_snapshot_version = snapshot_version;
if (OB_UNLIKELY(cur_stmt_snapshot_version <= 0)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN,
"statement snapshot version is invalid, unexpected error",
K(ret),
K(cur_stmt_snapshot_version),
K(session_info),
K(session_info.get_trans_desc()));
} else if (OB_FAIL(session_info.update_safe_weak_read_snapshot(session_info.get_effective_tenant_id(),
cur_stmt_snapshot_version,
ObBasicSessionInfo::LAST_STMT))) {
TRANS_LOG(WARN,
"update safe weak read snapshot version error",
K(ret),
K(snapshot_type),
K(snapshot_version),
K(session_info),
K(session_info.get_trans_desc()));
} else if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
LOG_INFO("update safe weak read snapshot version success",
K(ret),
"snapshot_version",
snapshot_version,
"session",
session_info,
K(session_info.get_trans_desc()));
} else {
// do nothing
}
}
}
return ret;
}
int ObSqlTransControl::start_stmt(
ObExecContext& exec_ctx, const common::ObPartitionLeaderArray& pla, bool is_remote /* = false*/)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
const ObPhysicalPlanCtx* phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
const ObPhysicalPlan* phy_plan = NULL;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(phy_plan_ctx) ||
OB_ISNULL(ps = executor_ctx->get_partition_service()) || OB_ISNULL(phy_plan = phy_plan_ctx->get_phy_plan())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(phy_plan_ctx), K(ps), K(phy_plan));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(my_session->set_cur_stmt_tables(pla.get_partitions(), phy_plan->get_stmt_type()))) {
LOG_WARN("set cur stmt tables fail", K(ret));
} else if (/*my_session->reuse_cur_sql_no() ||*/ my_session->is_standalone_stmt()) {
// skip start_stmt.
} else if (my_session->is_nested_session()) {
ObTransDesc& trans_desc = my_session->get_trans_desc();
TransResult& trans_result = my_session->get_trans_result();
if (OB_FAIL(ps->start_nested_stmt(trans_desc))) {
LOG_WARN("start nested stmt fail", K(ret), K(trans_desc));
} else if (OB_FAIL(trans_result.set_max_sql_no(trans_desc.get_sql_no()))) {
LOG_WARN("refresh max sql no fail", K(ret), K(trans_result), K(trans_desc));
}
/*
int64_t nested_count = my_session->get_nested_count();
int64_t max_sql_no = trans_result.get_max_sql_no();
const ObString nested_stmt = my_session->get_current_query_string();
LOG_INFO("start_nested_stmt", K(nested_count), K(nested_stmt), K(max_sql_no));
*/
} else if (my_session->is_fast_select()) {
if (pla.count() > 0) {
ObPartitionLeaderArray out_pla;
OZ(change_pla_info_(executor_ctx->get_table_locations(), pla, out_pla));
OX(my_session->set_read_uncommited(check_fast_select_read_uncommited(my_session->get_trans_desc(), out_pla)));
}
} else if (OB_UNLIKELY(ObSQLSessionInfo::INVALID_TYPE == my_session->get_session_type())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("session type is not init", K(my_session), K(ret));
} else if (OB_UNLIKELY(INVALID_CONSISTENCY == phy_plan_ctx->get_consistency_level())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("sql consistency level is invalid", K(ret), K(phy_plan_ctx->get_consistency_level()));
} else {
ObStmtParam stmt_param;
ObPartitionArray unreachable_partitions;
const bool is_retry_sql = (executor_ctx->get_retry_times() > 0);
const uint64_t tenant_id = my_session->get_effective_tenant_id();
transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
transaction::ObStartTransParam& trans_param = trans_desc.get_trans_param();
transaction::ObStmtDesc& stmt_desc = trans_desc.get_cur_stmt_desc();
int32_t trans_consistency_level = ObTransConsistencyLevel::UNKNOWN;
const int64_t last_safe_weak_read_snapshot = my_session->get_safe_weak_read_snapshot();
const int64_t last_safe_weak_read_snapshot_source = my_session->get_safe_weak_read_snapshot_source();
int64_t auto_spec_snapshot_version = ObTransVersion::INVALID_TRANS_VERSION;
const bool is_trx_elr = my_session->get_early_lock_release();
my_session->get_trans_result().clear_stmt_result();
if (OB_FAIL(set_trans_param_(*my_session,
*phy_plan,
phy_plan_ctx->get_consistency_level(),
trans_consistency_level,
auto_spec_snapshot_version))) {
LOG_WARN("set trans param fail", K(ret), KPC(my_session));
} else if (OB_FAIL(stmt_param.init(tenant_id,
get_stmt_timeout_ts(*phy_plan_ctx),
is_retry_sql,
last_safe_weak_read_snapshot,
last_safe_weak_read_snapshot_source,
is_trx_elr))) {
LOG_WARN("ObStmtParam init error", K(ret), K(tenant_id), K(is_retry_sql), K(last_safe_weak_read_snapshot));
} else {
stmt_desc.phy_plan_type_ = phy_plan->has_nested_sql() ? OB_PHY_PLAN_UNCERTAIN : phy_plan->get_location_type();
stmt_desc.stmt_type_ = phy_plan->get_stmt_type();
stmt_desc.is_sfu_ = phy_plan->has_for_update();
stmt_desc.execution_id_ = my_session->get_current_execution_id();
stmt_desc.sql_id_.assign_strive(phy_plan->stat_.sql_id_);
stmt_desc.trace_id_adaptor_.set(ObCurTraceId::get());
const ObString& app_trace_id_str = my_session->get_app_trace_id();
stmt_desc.app_trace_id_str_.reset();
stmt_desc.app_trace_id_str_.assign_buffer(stmt_desc.buffer_, sizeof(stmt_desc.buffer_));
stmt_desc.app_trace_id_str_.write(app_trace_id_str.ptr(), app_trace_id_str.length());
stmt_desc.cur_query_start_time_ = my_session->get_query_start_time();
stmt_desc.stmt_tenant_id_ = my_session->get_effective_tenant_id();
stmt_desc.inner_sql_ = my_session->is_inner();
stmt_desc.consistency_level_ = trans_consistency_level;
stmt_desc.trx_lock_timeout_ = my_session->get_trx_lock_timeout();
if (my_session->has_valid_read_snapshot_version()) {
stmt_desc.cur_stmt_specified_snapshot_version_ = my_session->get_read_snapshot_version();
} else if (auto_spec_snapshot_version > 0) {
stmt_desc.cur_stmt_specified_snapshot_version_ = auto_spec_snapshot_version;
}
/*
const ObString root_stmt = my_session->get_current_query_string();
LOG_INFO("start_root_stmt", K(root_stmt));
*/
ObPartitionLeaderArray out_pla;
if (pla.count() > 0 && OB_FAIL(change_pla_info_(executor_ctx->get_table_locations(), pla, out_pla))) {
LOG_WARN("change pla info error", K(ret), K(stmt_param), K(trans_desc), K(pla));
} else if (OB_FAIL(ps->start_stmt(stmt_param, trans_desc, out_pla, unreachable_partitions))) {
LOG_WARN("fail start stmt",
K(ret),
K(trans_desc),
K(pla),
K(out_pla),
K(is_remote),
K(unreachable_partitions.count()));
if ((is_transaction_rpc_timeout_err(ret) || is_data_not_readable_err(ret) || is_partition_change_error(ret)) &&
!is_remote) {
ObAddr unreachable_server;
for (int64_t i = 0; i < unreachable_partitions.count(); ++i) {
unreachable_server.reset();
int tmp_ret = OB_SUCCESS;
if (OB_UNLIKELY(
OB_SUCCESS != (tmp_ret = out_pla.find_leader(unreachable_partitions.at(i), unreachable_server)))) {
LOG_ERROR("can not find unreachable server", K(ret), K(tmp_ret), K(i), K(unreachable_partitions.at(i)));
} else if (OB_UNLIKELY(OB_SUCCESS !=
(tmp_ret = my_session->get_retry_info_for_update().add_invalid_server_distinctly(
unreachable_server, true)))) {
LOG_WARN("fail to add invalid server distinctly",
K(ret),
K(tmp_ret),
K(unreachable_server),
K(my_session->get_retry_info()));
}
}
}
} else {
OZ(update_safe_weak_read_snapshot(trans_param.is_bounded_staleness_read(),
*my_session,
trans_param.get_read_snapshot_type(),
trans_desc.get_snapshot_version()));
OZ(my_session->set_start_stmt());
if (phy_plan->is_contain_oracle_trx_level_temporary_table()) {
trans_desc.set_trx_level_temporary_table_involved();
}
DEBUG_TRANS_STAGE("start_stmt");
}
}
}
return ret;
}
int ObSqlTransControl::specify_stmt_snapshot_version_for_slave_cluster_sql_(const ObSQLSessionInfo& session,
const stmt::StmtType& literal_stmt_type, const bool& is_contain_inner_table, const int32_t consistency_type,
const int32_t read_snapshot_type, int64_t& auto_spec_snapshot_version)
{
int ret = OB_SUCCESS;
share::ObSchemaStatusProxy* schema_status_proxy = GCTX.schema_status_proxy_;
share::schema::ObRefreshSchemaStatus slave_schema_status;
const uint64_t tenant_id = session.get_effective_tenant_id();
const bool is_standby_cluster = GCTX.is_standby_cluster();
const bool is_inner_sql = session.is_inner();
const bool is_from_show_stmt = (literal_stmt_type != stmt::T_NONE);
const bool stmt_spec_snapshot_version_is_valid = session.has_valid_read_snapshot_version();
if (is_standby_cluster && is_contain_inner_table && !is_inner_sql && OB_SYS_TENANT_ID != tenant_id &&
!stmt_spec_snapshot_version_is_valid && ObTransConsistencyType::BOUNDED_STALENESS_READ == consistency_type) {
if (OB_ISNULL(schema_status_proxy)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid schema_status_proxy", K(schema_status_proxy));
} else if (OB_FAIL(schema_status_proxy->nonblock_get(tenant_id, slave_schema_status))) {
LOG_WARN("get schema status on slave cluster fail",
K(ret),
K(tenant_id),
K(session),
K(is_standby_cluster),
K(is_from_show_stmt),
K(is_contain_inner_table),
K(consistency_type),
K(read_snapshot_type));
if (OB_ENTRY_NOT_EXIST == ret) {
ret = OB_REPLICA_NOT_READABLE;
}
} else if (OB_UNLIKELY(slave_schema_status.snapshot_timestamp_ <= 0)) {
ret = OB_REPLICA_NOT_READABLE;
LOG_WARN("tenant schema status is invalid on slave cluster",
K(ret),
K(tenant_id),
K(slave_schema_status),
K(session),
K(is_standby_cluster),
K(is_from_show_stmt),
K(is_contain_inner_table),
K(consistency_type),
K(read_snapshot_type));
} else {
auto_spec_snapshot_version = slave_schema_status.snapshot_timestamp_;
LOG_INFO("specify statement snapshot version for slave cluster SQL",
K(tenant_id),
K(slave_schema_status),
K(is_standby_cluster),
K(is_inner_sql),
K(is_from_show_stmt),
K(is_contain_inner_table),
K(consistency_type),
K(read_snapshot_type),
K(session));
}
if (OB_REPLICA_NOT_READABLE == ret) {
LOG_WARN("slave cluster inner table readable snapshot version still not ready, need retry",
K(ret),
K(tenant_id),
K(is_standby_cluster),
K(is_inner_sql),
K(is_from_show_stmt),
K(consistency_type),
K(read_snapshot_type),
K(is_contain_inner_table),
K(session));
}
} else {
}
return ret;
}
int ObSqlTransControl::trans_param_compat_with_cluster_before_2200_(ObTransDesc& trans_desc)
{
int ret = OB_SUCCESS;
if (trans_desc.get_cluster_version() < CLUSTER_VERSION_2200) {
if (trans_desc.get_snapshot_version() == ObTransVersion::INVALID_TRANS_VERSION) {
trans_desc.set_snapshot_version(0);
}
}
return ret;
}
int ObSqlTransControl::set_trans_param_(ObSQLSessionInfo& my_session, const ObPhysicalPlan& phy_plan,
const ObConsistencyLevel sql_consistency_level, int32_t& trans_consistency_level,
int64_t& auto_spec_snapshot_version)
{
int ret = OB_SUCCESS;
transaction::ObTransDesc& trans_desc = my_session.get_trans_desc();
int32_t trans_consistency_type = ObTransConsistencyType::UNKNOWN;
int32_t read_snapshot_type = ObTransReadSnapshotType::UNKNOWN;
if (OB_FAIL(decide_trans_read_interface_specs("ObSqlTransControl::start_stmt",
my_session,
phy_plan.get_stmt_type(),
phy_plan.get_literal_stmt_type(),
phy_plan.has_for_update(),
phy_plan.is_contain_inner_table(),
sql_consistency_level,
phy_plan.need_consistent_snapshot(),
trans_consistency_level,
trans_consistency_type,
read_snapshot_type))) {
LOG_WARN("fail to decide trans read interface specs",
K(ret),
K(phy_plan.get_stmt_type()),
K(phy_plan.get_literal_stmt_type()),
K(phy_plan.has_for_update()),
K(phy_plan.is_contain_inner_table()),
K(sql_consistency_level),
K(my_session));
} else if (GCTX.is_standby_cluster() && OB_FAIL(specify_stmt_snapshot_version_for_slave_cluster_sql_(my_session,
phy_plan.get_literal_stmt_type(),
phy_plan.is_contain_inner_table(),
trans_consistency_type,
read_snapshot_type,
auto_spec_snapshot_version))) {
LOG_WARN("specify statement snapshot version for slave cluster sql fail", K(ret), K(my_session));
} else if (OB_FAIL(trans_param_compat_with_cluster_before_2200_(trans_desc))) {
LOG_WARN("trans param compat with cluster before 2200 fail", K(ret), K(trans_desc));
} else {
trans_desc.get_trans_param().set_read_snapshot_type(read_snapshot_type);
if (ObTransConsistencyType::UNKNOWN == my_session.get_trans_consistency_type()) {
my_session.set_trans_consistency_type(trans_consistency_type);
trans_desc.get_trans_param().set_consistency_type(trans_consistency_type);
} else if (trans_desc.get_trans_param().get_consistency_type() != trans_consistency_type) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("different consistency type in one transaction, unexpected error",
K(ret),
K(trans_consistency_type),
K(trans_desc));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "different consistency type in one transaction");
}
if (OB_SUCCESS == ret) {
if (ObTransConsistencyType::BOUNDED_STALENESS_READ == trans_consistency_type) {
trans_desc.get_trans_param().set_access_mode(transaction::ObTransAccessMode::READ_ONLY);
}
}
}
LOG_DEBUG("ObSqlTransControl::start_stmt::set_trans_param",
K(sql_consistency_level),
K(trans_consistency_type),
K(trans_consistency_level),
K(read_snapshot_type),
K(auto_spec_snapshot_version),
K(trans_desc),
K(my_session));
return ret;
}
int ObSqlTransControl::get_discard_participants(const ObPartitionArray& all_partitions,
const ObPartitionArray& response_partitions, ObPartitionArray& discard_partitions)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(all_partitions.count() > response_partitions.count())) {
for (int64_t i = 0; OB_SUCC(ret) && i < all_partitions.count(); ++i) {
if (OB_UNLIKELY(!has_exist_in_array(response_partitions, all_partitions.at(i)))) {
OZ(discard_partitions.push_back(all_partitions.at(i)), i, all_partitions.at(i));
}
}
LOG_DEBUG("get discard participants", K(ret), K(all_partitions), K(response_partitions), K(discard_partitions));
}
return ret;
}
int ObSqlTransControl::create_savepoint(ObExecContext& exec_ctx, const ObString& sp_name)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(ps->savepoint(my_session->get_trans_desc(), sp_name))) {
LOG_WARN("fail to create savepoint", K(ret), K(sp_name));
}
return ret;
}
int ObSqlTransControl::rollback_savepoint(ObExecContext& exec_ctx, const ObString& sp_name)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
const ObPhysicalPlanCtx* phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
transaction::ObStmtParam stmt_param;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(
stmt_param.init(my_session->get_effective_tenant_id(), get_stmt_timeout_ts(*phy_plan_ctx), false))) {
LOG_WARN("ObStmtParam init error", K(ret), K(my_session));
} else if (OB_FAIL(ps->rollback_savepoint(my_session->get_trans_desc(), sp_name, stmt_param))) {
LOG_WARN("fail to rollback savepoint", K(ret), K(sp_name), K(stmt_param));
}
return ret;
}
int ObSqlTransControl::release_savepoint(ObExecContext& exec_ctx, const ObString& sp_name)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(ps->release_savepoint(my_session->get_trans_desc(), sp_name))) {
LOG_WARN("fail to release savepoint", K(ret), K(sp_name));
}
return ret;
}
int ObSqlTransControl::xa_rollback_all_changes(ObExecContext& exec_ctx)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
const ObPhysicalPlanCtx* phy_plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
transaction::ObStmtParam stmt_param;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("null ptr", K(ret), K(executor_ctx), K(my_session), K(ps));
} else if (!my_session->is_in_transaction() || !my_session->get_trans_desc().is_xa_local_trans()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("called in wrong context", K(ret), K(my_session->get_trans_desc()));
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(
stmt_param.init(my_session->get_effective_tenant_id(), get_stmt_timeout_ts(*phy_plan_ctx), false))) {
LOG_WARN("ObStmtParam init error", K(ret), K(my_session));
} else if (OB_FAIL(ps->xa_rollback_all_changes(my_session->get_trans_desc(), stmt_param))) {
LOG_WARN("fail to rollback savepoint", K(ret), K(stmt_param), K(my_session->get_trans_desc()));
}
return ret;
}
int ObSqlTransControl::end_stmt(ObExecContext& exec_ctx, const bool is_rollback)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
storage::ObPartitionService* ps = NULL;
common::ObPartitionLeaderArray pla;
bool skip_end_stmt = false;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) {
LOG_WARN("can't get partition service or my_session", K(executor_ctx), K(my_session), K(ps));
ret = OB_ERR_UNEXPECTED;
} else if (my_session->is_fast_select() || my_session->is_standalone_stmt()) {
skip_end_stmt = true;
} else if (FALSE_IT(skip_end_stmt = my_session->reuse_cur_sql_no())) {
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (OB_FAIL(get_participants(
executor_ctx->get_table_locations(), pla, my_session->get_is_in_retry_for_dup_tbl()))) {
LOG_WARN("get participants failed", K(ret), K(executor_ctx->get_table_locations()));
} else if (OB_FAIL(merge_stmt_partitions(exec_ctx, *my_session))) {
LOG_WARN("fail to merge stmt partitions", K(ret));
} else if (skip_end_stmt) {
// nesetd sql will not end_stmt().
LOG_DEBUG("skip end_stmt",
K(ret),
"cur_stmt",
my_session->get_current_query_string(),
"trans_result",
my_session->get_trans_result());
} else if (my_session->is_nested_session()) {
TransResult& trans_result = my_session->get_trans_result();
ObTransDesc& trans_desc = my_session->get_trans_desc();
ObPartitionArray cur_stmt_all_pgs;
if (trans_result.get_total_partitions().count() > 0 &&
OB_FAIL(change_pkeys_to_pgs_(
executor_ctx->get_table_locations(), trans_result.get_total_partitions(), cur_stmt_all_pgs))) {
LOG_WARN("change cur stmt pkeys to pgs error", K(ret), K(trans_result));
trans_desc.set_need_rollback();
} else if (OB_FAIL(ps->end_nested_stmt(trans_desc, cur_stmt_all_pgs, is_rollback))) {
LOG_WARN("end nested stmt fail", K(ret), K(trans_desc), K(cur_stmt_all_pgs), K(is_rollback));
}
/*
int64_t nested_count = my_session->get_nested_count();
const ObString nested_stmt = my_session->get_current_query_string();
LOG_INFO("end_nested_stmt", K(nested_count), K(nested_stmt), K(trans_result), K(is_rollback));
*/
} else {
transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
TransResult& trans_result = my_session->get_trans_result();
ObPartitionArray discard_partitions;
if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_2100 && !is_rollback) {
// param discard_partitions is added in version 2.1
// in order to compatible with the old server below version 2.1
// only calculate the discard partition when all server are updated to version 2.1
OZ(get_discard_participants(
trans_result.get_total_partitions(), trans_result.get_response_partitions(), discard_partitions),
trans_result,
discard_partitions);
}
ObPartitionArray cur_stmt_all_pgs;
ObPartitionArray cur_stmt_discard_pgs;
ObPartitionLeaderArray cur_stmt_pla;
if (trans_result.get_total_partitions().count() > 0 &&
OB_FAIL(change_pkeys_to_pgs_(
executor_ctx->get_table_locations(), trans_result.get_total_partitions(), cur_stmt_all_pgs))) {
LOG_WARN("change cur stmt pkeys to pgs error", K(ret), K(trans_result), K(trans_desc));
trans_desc.set_need_rollback();
} else if (discard_partitions.count() > 0 &&
OB_FAIL(change_pkeys_to_pgs_(
executor_ctx->get_table_locations(), discard_partitions, cur_stmt_discard_pgs))) {
LOG_WARN("change cur stmt discard pkeys to pgs error", K(ret), K(trans_result), K(trans_desc));
trans_desc.set_need_rollback();
} else if (pla.count() > 0 && OB_FAIL(change_pla_info_(executor_ctx->get_table_locations(), pla, cur_stmt_pla))) {
LOG_WARN("change pla info error", K(ret), K(pla), K(cur_stmt_pla), K(trans_result), K(trans_desc));
trans_desc.set_need_rollback();
} else {
/*
const ObString root_stmt = my_session->get_current_query_string();
LOG_INFO("end_root_stmt", K(root_stmt), KP(&trans_desc),
"trans_desc_max_sql_no", trans_desc.get_max_sql_no(),
"trans_result_max_sql_no", trans_result.get_max_sql_no());
*/
OZ(ps->end_stmt(is_rollback,
trans_result.is_incomplete(),
cur_stmt_all_pgs,
trans_result.get_part_epoch_list(),
cur_stmt_discard_pgs,
cur_stmt_pla,
trans_desc),
trans_result,
discard_partitions,
trans_desc);
DEBUG_TRANS_STAGE("end_stmt");
}
if (OB_FAIL(ret)) {
LOG_WARN("call end_stmt",
K(ret),
K(trans_desc),
"cur_stmt",
my_session->get_current_query_string(),
K(cur_stmt_all_pgs),
K(cur_stmt_discard_pgs),
K(cur_stmt_pla),
K(discard_partitions),
K(trans_result));
}
}
if (OB_NOT_NULL(my_session) && !skip_end_stmt && !my_session->is_nested_session()) {
my_session->get_trans_result().clear_stmt_result();
int end_ret = my_session->set_end_stmt();
if (OB_SUCCESS != end_ret) {
LOG_ERROR("failed to set end stmt", K(end_ret));
if (OB_SUCCESS == ret) {
ret = end_ret;
}
}
}
return ret;
}
int ObSqlTransControl::start_participant(
ObExecContext& exec_ctx, const common::ObPartitionArray& participants, bool is_remote /* = false*/)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
storage::ObPartitionService* ps = NULL;
const ObPhysicalPlan* phy_plan = NULL;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service()) ||
OB_ISNULL(phy_plan = GET_PHY_PLAN_CTX(exec_ctx)->get_phy_plan())) {
LOG_WARN("can't get partition service or my_session", K(executor_ctx), K(my_session), K(ps));
ret = OB_ERR_UNEXPECTED;
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (my_session->is_fast_select() || my_session->is_standalone_stmt()) {
// do nothing ...
} else {
transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
TransResult& trans_result = my_session->get_trans_result();
if (OB_FAIL(trans_result.merge_total_partitions(participants))) {
LOG_WARN("fail to merge partitions", K(ret), K(participants));
}
if (OB_SUCC(ret)) {
ObPartitionArray pgs;
ObPartitionEpochArray partition_epoch_arr;
const ObTransID& trans_id = trans_desc.get_trans_id();
int64_t sql_no = trans_desc.get_sql_no();
bool is_forbidden = false;
if (participants.count() > 0 &&
OB_FAIL(change_pkeys_to_pgs_(executor_ctx->get_table_locations(), participants, pgs))) {
LOG_WARN("change pkeys to pgs error", K(ret), K(participants), K(pgs), K(trans_desc));
} else if (OB_FAIL(ps->is_trans_forbidden_sql_no(trans_id, pgs, sql_no, is_forbidden))) {
LOG_WARN("get trans forbidden error", K(ret), K(participants), K(pgs), K(trans_desc));
} else if (is_forbidden) {
ret = OB_ERR_INTERRUPTED;
LOG_WARN("execution is interrupted", K(ret), K(participants), K(pgs), K(trans_desc));
} else if (OB_FAIL(ps->start_participant(trans_desc, pgs, partition_epoch_arr))) {
if (OB_NOT_MASTER != ret) {
LOG_WARN(
"fail start participants", K(ret), K(is_remote), K(participants), K(pgs), K(trans_result), K(trans_desc));
}
if (is_data_not_readable_err(ret) && !is_remote) {
int add_ret = OB_SUCCESS;
if (OB_UNLIKELY(
OB_SUCCESS != (add_ret = my_session->get_retry_info_for_update().add_invalid_server_distinctly(
exec_ctx.get_addr(), true)))) {
LOG_WARN("fail to add local addr to invalid servers distinctly",
K(ret),
K(add_ret),
K(exec_ctx.get_addr()),
K(my_session->get_retry_info()));
}
}
} else {
DEBUG_TRANS_STAGE("start_participant");
LOG_DEBUG("start participant", K(participants), K(trans_result));
OZ(trans_result.merge_response_partitions(participants), participants);
OZ(trans_result.merge_part_epoch_list(partition_epoch_arr), partition_epoch_arr);
}
NG_TRACE_EXT(start_part, OB_ID(ret), ret, OB_ID(trans_id), trans_desc.get_trans_id());
}
}
return ret;
}
int ObSqlTransControl::end_participant(
ObExecContext& exec_ctx, const bool is_rollback, const common::ObPartitionArray& participants)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
storage::ObPartitionService* ps = NULL;
if (OB_ISNULL(executor_ctx) || OB_ISNULL(my_session) || OB_ISNULL(ps = executor_ctx->get_partition_service())) {
LOG_WARN("can't get partition service or my_session", K(executor_ctx), K(my_session), K(ps));
ret = OB_ERR_UNEXPECTED;
} else if (OB_UNLIKELY(my_session->is_zombie())) {
// session has been killed some moment ago
ret = OB_ERR_SESSION_INTERRUPTED;
LOG_WARN("session has been killed",
K(ret),
K(my_session->get_session_state()),
K(my_session->get_sessid()),
"proxy_sessid",
my_session->get_proxy_sessid());
} else if (my_session->is_fast_select() || my_session->is_standalone_stmt()) {
// do nothing ...
} else {
transaction::ObTransDesc& trans_desc = my_session->get_trans_desc();
ObPartitionArray pgs;
if (participants.count() > 0 &&
OB_FAIL(change_pkeys_to_pgs_(executor_ctx->get_table_locations(), participants, pgs))) {
LOG_WARN("change pkeys to pgs error", K(ret), K(participants), K(pgs), K(trans_desc));
} else if (OB_FAIL(ps->end_participant(is_rollback, trans_desc, pgs))) {
LOG_WARN("fail end participant", K(ret), K(is_rollback), K(participants), K(pgs), K(trans_desc));
} else {
// do nothing
DEBUG_TRANS_STAGE("end_participant");
}
}
NG_TRACE_EXT(end_participant, OB_ID(ret), ret);
return ret;
}
int ObSqlTransControl::get_pg_key_(
const ObIArray<ObPhyTableLocation>& table_locations, const ObPartitionKey& pkey, ObPartitionKey& pg_key)
{
int ret = OB_SUCCESS;
bool hit = false;
for (int64_t i = 0; OB_SUCC(ret) && !hit && i < table_locations.count(); ++i) {
const ObPartitionReplicaLocationIArray& part_locations = table_locations.at(i).get_partition_location_list();
for (int64_t j = 0; OB_SUCC(ret) && j < part_locations.count(); ++j) {
ObPartitionKey tmp_pkey;
if (OB_FAIL(part_locations.at(j).get_partition_key(tmp_pkey))) {
LOG_WARN("get partition key error", K(ret), "location", part_locations.at(j));
} else if (pkey == tmp_pkey) {
pg_key = part_locations.at(j).get_pg_key();
hit = true;
break;
} else {
// do nothing
}
}
}
if (OB_SUCC(ret)) {
if (!hit || !pg_key.is_valid()) {
LOG_DEBUG("cannot find pg key", K(ret), K(table_locations), K(pkey), K(pg_key));
if (OB_FAIL(storage::ObPartitionService::get_instance().get_pg_key(pkey, pg_key))) {
LOG_WARN("get pg key error", K(ret), K(pkey), K(pg_key));
}
}
}
return ret;
}
int ObSqlTransControl::change_pkeys_to_pgs_(
const ObIArray<ObPhyTableLocation>& table_locations, const ObPartitionArray& pkeys, ObPartitionArray& pg_keys)
{
int ret = OB_SUCCESS;
if (pkeys.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(pkeys), K(pg_keys), K(table_locations));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pkeys.count(); ++i) {
ObPGKey pg_key;
if (table_locations.count() <= 0) {
if (OB_FAIL(storage::ObPartitionService::get_instance().get_pg_key(pkeys.at(i), pg_key))) {
LOG_WARN("get pg key error", K(ret), K(pkeys), K(pg_key));
}
} else if (OB_FAIL(get_pg_key_(table_locations, pkeys.at(i), pg_key))) {
LOG_WARN("get pg key error", K(ret), "key", pkeys.at(i));
} else {
// do nothing
}
if (OB_SUCC(ret)) {
int64_t j = 0;
for (; j < pg_keys.count(); ++j) {
if (pg_key == pg_keys.at(j)) {
break;
}
}
if (j == pg_keys.count()) {
if (OB_FAIL(pg_keys.push_back(pg_key))) {
LOG_WARN("pg keys push back error", K(ret), K(pg_key), K(pg_keys));
}
}
}
}
}
return ret;
}
int ObSqlTransControl::change_pla_info_(const ObIArray<ObPhyTableLocation>& table_locations,
const ObPartitionLeaderArray& pla, ObPartitionLeaderArray& out_pla)
{
int ret = OB_SUCCESS;
if (pla.count() <= 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(pla), K(table_locations));
} else {
for (int64_t i = 0; OB_SUCC(ret) && i < pla.count(); ++i) {
ObPGKey pg_key;
if (table_locations.count() <= 0) {
if (OB_FAIL(storage::ObPartitionService::get_instance().get_pg_key(pla.get_partitions().at(i), pg_key))) {
LOG_WARN("get pg key error", K(ret), K(table_locations), K(pla));
}
} else if (OB_FAIL(get_pg_key_(table_locations, pla.get_partitions().at(i), pg_key))) {
LOG_WARN("get pg key error", K(ret), K(table_locations), K(pla));
} else {
// do nothing
}
if (OB_SUCC(ret)) {
const ObAddr& addr = pla.get_leaders().at(i);
const common::ObPartitionType& type = pla.get_types().at(i);
int64_t j = 0;
for (; j < out_pla.count(); ++j) {
if (pg_key == out_pla.get_partitions().at(j) && addr == out_pla.get_leaders().at(j) &&
type == out_pla.get_types().at(j)) {
break;
}
}
if (j == out_pla.count()) {
if (OB_FAIL(out_pla.push(pg_key, addr, type))) {
LOG_WARN("out pla push back error", K(pg_key), K(addr), K(type));
}
}
}
}
}
return ret;
}
int ObSqlTransControl::get_participants(
const ObIArray<ObPhyTableLocation>& table_locations, ObPartitionLeaderArray& pla, bool is_retry_for_dup_tbl)
{
int ret = OB_SUCCESS;
uint64_t N = table_locations.count();
ObPartitionKey key;
common::ObPartitionType type;
// SQL_LOG(DEBUG, "phy table location size:", K(N));
for (uint64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObPartitionReplicaLocationIArray& part_locations = table_locations.at(i).get_partition_location_list();
int64_t M = part_locations.count();
bool is_dup = table_locations.at(i).is_duplicate_table();
bool is_dup_not_dml = table_locations.at(i).is_duplicate_table_not_in_dml();
for (int64_t j = 0; OB_SUCC(ret) && j < M; ++j) {
if (is_dup) {
if (part_locations.at(j).get_replica_location().is_follower()) {
type = ObPartitionType::DUPLICATE_FOLLOWER_PARTITION;
} else if (is_dup_not_dml && false == is_retry_for_dup_tbl) {
type = ObPartitionType::DUPLICATE_FOLLOWER_PARTITION;
} else {
type = ObPartitionType::DUPLICATE_LEADER_PARTITION;
}
} else {
type = ObPartitionType::NORMAL_PARTITION;
}
// LOG_INFO("partition location", K(is_dup), K(type), K(is_dup_not_dml), K(part_locations.at(j)));
if (OB_FAIL(part_locations.at(j).get_partition_key(key))) {
LOG_WARN("failed to get partition key", K(ret));
} else if (OB_UNLIKELY(is_virtual_table(key.table_id_))) {
} else if (OB_FAIL(append_participant_to_array_distinctly(
pla, key, type, part_locations.at(j).get_replica_location().server_))) {
/*
* In the case of multiple alias table corresponding to the same physical table, the
* key we get could be duplicate. To avoid having same partition as participants more
* than once, we need to check if we have already added the key in the list.
*/
LOG_WARN("fail to push back participant", K(ret), K(i), K(key), K(is_dup));
}
} /* for */
} /* for */
return ret;
}
int ObSqlTransControl::get_participants(ObExecContext& exec_ctx, ObPartitionLeaderArray& pla)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* task_exec_ctx = exec_ctx.get_task_executor_ctx();
ObSQLSessionInfo* my_session = GET_MY_SESSION(exec_ctx);
if (OB_ISNULL(task_exec_ctx) || OB_ISNULL(my_session)) {
ret = OB_NOT_INIT;
LOG_WARN("task executor ctx should not be NULL", K(my_session));
} else if (OB_FAIL(get_participants(
task_exec_ctx->get_table_locations(), pla, my_session->get_is_in_retry_for_dup_tbl()))) {
LOG_WARN("get participants failed", K(ret));
}
return ret;
}
int ObSqlTransControl::append_participant_to_array_distinctly(
ObPartitionLeaderArray& pla, const ObPartitionKey& key, const common::ObPartitionType type, const ObAddr& svr)
{
int ret = OB_SUCCESS;
bool found = false;
// check if the key already exists
for (int64_t i = 0; !found && i < pla.count(); i++) {
if (pla.get_partitions().at(i) == key && pla.get_leaders().at(i) == svr) {
found = true;
}
}
// if not, add it
if (!found) {
ret = pla.push(key, svr, type);
}
return ret;
}
int ObSqlTransControl::get_participants(ObExecContext& exec_ctx, common::ObPartitionArray& participants)
{
// DO NOT reset participants for init, see ObResultSet::merge_stmt_partitions().
int ret = OB_SUCCESS;
ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx();
const ObIArray<ObPhyTableLocation>& table_locations = task_exec_ctx.get_table_locations();
uint64_t N = table_locations.count();
SQL_LOG(DEBUG, "phy table location size:", K(N));
for (uint64_t i = 0; OB_SUCC(ret) && i < N; ++i) {
const ObPartitionReplicaLocationIArray& part_locations = table_locations.at(i).get_partition_location_list();
int64_t M = part_locations.count();
for (int64_t j = 0; OB_SUCC(ret) && j < M; ++j) {
// LOG_INFO("partition location", K(part_locations.at(j)));
ObPartitionKey key;
if (OB_FAIL(part_locations.at(j).get_partition_key(key))) {
LOG_WARN("failed to get partition key", K(ret));
} else if (OB_UNLIKELY(is_virtual_table(key.table_id_))) {
} else if (OB_FAIL(append_participant_to_array_distinctly(participants, key))) {
/*
* In the case of multiple alias table corresponding to the same physical table, the
* key we get could be duplicate. To avoid having same partition as participants more
* than once, we need to check if we have already added the key in the list.
*/
LOG_WARN("fail to push back participant", K(ret), K(i), K(j), K(key));
}
}
}
return ret;
}
int ObSqlTransControl::get_root_job_participants(
ObExecContext& exec_ctx, const ObPhyOperator& root_job_root_op, ObPartitionArray& participants)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx();
ObSEArray<TableLocationKey, 2> table_location_keys;
OZ(ObTaskSpliter::find_all_table_location_keys(table_location_keys, root_job_root_op));
for (int64_t i = 0; OB_SUCC(ret) && i < table_location_keys.count(); ++i) {
const TableLocationKey& table_location_key = table_location_keys.at(i);
const ObPhyTableLocation* table_loc = NULL;
OZ(ObTaskExecutorCtxUtil::get_phy_table_location(
task_exec_ctx, table_location_key.table_id_, table_location_key.ref_table_id_, table_loc));
CK(OB_NOT_NULL(table_loc));
OZ(append_participant_by_table_loc(participants, *table_loc));
}
return ret;
}
int ObSqlTransControl::get_root_job_participants(
ObExecContext& exec_ctx, const ObOperator& root_job_root_op, ObPartitionArray& participants)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx& task_exec_ctx = exec_ctx.get_task_exec_ctx();
ObSEArray<TableLocationKey, 2> table_location_keys;
OZ(ObTaskSpliter::find_all_table_location_keys(table_location_keys, root_job_root_op.get_spec()));
for (int64_t i = 0; OB_SUCC(ret) && i < table_location_keys.count(); ++i) {
const TableLocationKey& table_location_key = table_location_keys.at(i);
const ObPhyTableLocation* table_loc = NULL;
OZ(ObTaskExecutorCtxUtil::get_phy_table_location(
task_exec_ctx, table_location_key.table_id_, table_location_key.ref_table_id_, table_loc));
CK(OB_NOT_NULL(table_loc));
OZ(append_participant_by_table_loc(participants, *table_loc));
}
return ret;
}
int ObSqlTransControl::append_participant_by_table_loc(
ObPartitionIArray& participants, const ObPhyTableLocation& table_loc)
{
int ret = OB_SUCCESS;
const ObPartitionReplicaLocationIArray& part_locations = table_loc.get_partition_location_list();
int64_t M = part_locations.count();
ObPartitionKey key;
for (int64_t j = 0; OB_SUCC(ret) && j < M; ++j) {
key.reset();
if (OB_FAIL(part_locations.at(j).get_partition_key(key))) {
LOG_WARN("failed to get partition key", K(ret));
} else if (OB_UNLIKELY(true == is_virtual_table(key.table_id_))) {
} else if (OB_FAIL(append_participant_to_array_distinctly(participants, key))) {
/*
* In the case of multiple alias table corresponding to the same physical table, the
* key we get could be duplicate. To avoid having same partition as participants more
* than once, we need to check if we have already added the key in the list.
*/
LOG_WARN("fail to push back participant", K(ret), K(j), K(key));
}
}
return ret;
}
int ObSqlTransControl::append_participant_to_array_distinctly(
ObPartitionIArray& participants, const ObPartitionKey& key)
{
int ret = OB_SUCCESS;
bool found = false;
// check if the key already exists
for (int64_t i = 0; !found && i < participants.count(); i++) {
if (participants.at(i) == key) {
found = true;
}
}
// if not, add it
if (!found) {
ret = participants.push_back(key);
}
return ret;
}
int ObSqlTransControl::inc_session_ref(const ObSQLSessionInfo* my_session)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(GCTX.session_mgr_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid parameter", K(my_session), K(GCTX.session_mgr_), K(ret));
} else {
ret = GCTX.session_mgr_->inc_session_ref(my_session);
}
return ret;
}
int ObSqlTransControl::merge_stmt_partitions(ObExecContext& exec_ctx, ObSQLSessionInfo& session)
{
int ret = OB_SUCCESS;
ObPartitionArray total_partitions;
// ObSqlTransControl::get_participants() can merge partitions.
if (OB_FAIL(ObSqlTransControl::get_participants(exec_ctx, total_partitions))) {
LOG_WARN("get participants failed", K(ret));
} else if (OB_FAIL(session.get_trans_result().merge_total_partitions(total_partitions))) {
LOG_WARN("merge total participants failed", K(ret));
}
return ret;
}
int ObSqlTransControl::get_stmt_snapshot_info(
ObExecContext& exec_ctx, const bool is_cursor, transaction::ObTransDesc& trans_desc, ObTransSnapInfo& snap_info)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
transaction::ObTransService* txs = NULL;
CK(OB_NOT_NULL(executor_ctx));
CK(OB_NOT_NULL(ps = executor_ctx->get_partition_service()));
CK(OB_NOT_NULL(txs = ps->get_trans_service()));
OZ(txs->get_stmt_snapshot_info(is_cursor, trans_desc, snap_info));
return ret;
}
int ObSqlTransControl::start_standalone_stmt(
ObExecContext& exec_ctx, ObSQLSessionInfo& session_info, ObPhysicalPlanCtx& phy_plan_ctx, bool is_remote)
{
int ret = OB_SUCCESS;
LOG_DEBUG("start_standalone_stmt", "session_id", session_info.get_sessid());
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
storage::ObPartitionService* ps = NULL;
transaction::ObTransService* txs = NULL;
bool is_local_single_partition_stmt = false;
const ObPhysicalPlan* phy_plan = phy_plan_ctx.get_phy_plan();
transaction::ObStandaloneStmtDesc& standalone_stmt_desc = session_info.get_trans_desc().get_standalone_stmt_desc();
int32_t consistency_level = ObTransConsistencyLevel::UNKNOWN;
int32_t consistency_type = ObTransConsistencyType::UNKNOWN;
int32_t read_snapshot_type = ObTransReadSnapshotType::UNKNOWN;
int64_t auto_spec_snapshot_version = ObTransVersion::INVALID_TRANS_VERSION;
CK(OB_NOT_NULL(executor_ctx));
CK(OB_NOT_NULL(ps = executor_ctx->get_partition_service()));
CK(OB_NOT_NULL(txs = ps->get_trans_service()));
CK(OB_NOT_NULL(phy_plan));
OZ(decide_trans_read_interface_specs("ObSqlTransControl::start_standalone_stmt",
session_info,
phy_plan_ctx.get_phy_plan()->get_stmt_type(),
phy_plan_ctx.get_phy_plan()->get_literal_stmt_type(),
phy_plan_ctx.get_phy_plan()->has_for_update(),
phy_plan_ctx.get_phy_plan()->is_contain_inner_table(),
phy_plan_ctx.get_consistency_level(),
phy_plan_ctx.get_phy_plan()->need_consistent_snapshot(),
consistency_level,
consistency_type,
read_snapshot_type));
if (GCTX.is_standby_cluster()) {
OZ(specify_stmt_snapshot_version_for_slave_cluster_sql_(session_info,
phy_plan_ctx.get_phy_plan()->get_literal_stmt_type(),
phy_plan_ctx.get_phy_plan()->is_contain_inner_table(),
consistency_type,
read_snapshot_type,
auto_spec_snapshot_version));
}
if (OB_SUCC(ret) && session_info.has_valid_read_snapshot_version()) {
auto_spec_snapshot_version = session_info.get_read_snapshot_version();
}
if (OB_SUCC(ret)) {
common::ObPartitionLeaderArray in_pla;
if (!(phy_plan->has_nested_sql() || OB_PHY_PLAN_UNCERTAIN == phy_plan->get_location_type())) {
OZ(ObSqlTransControl::get_participants(exec_ctx, in_pla));
if (OB_SUCC(ret)) {
is_local_single_partition_stmt = (in_pla.count() == 1 && in_pla.get_leaders().at(0) == MYADDR &&
read_snapshot_type == ObTransReadSnapshotType::STATEMENT_SNAPSHOT);
}
}
OZ(standalone_stmt_desc.init(txs->get_server(),
session_info.get_effective_tenant_id(),
get_stmt_timeout_ts(phy_plan_ctx),
session_info.get_trx_lock_timeout(),
is_local_single_partition_stmt,
consistency_type,
read_snapshot_type,
is_local_single_partition_stmt ? in_pla.get_partitions().at(0) : ObPartitionKey()));
}
if (OB_FAIL(ret)) {
} else if (standalone_stmt_desc.is_bounded_staleness_read() &&
!ObWeakReadUtil::enable_monotonic_weak_read(session_info.get_effective_tenant_id())) {
if (ObSqlTransUtil::plan_can_start_trans(session_info.get_local_autocommit(), session_info.get_in_transaction())) {
OZ(implicit_start_trans(exec_ctx, is_remote));
}
} else {
OZ(txs->get_stmt_snapshot_info(session_info.get_trans_desc(), auto_spec_snapshot_version));
if (standalone_stmt_desc.is_bounded_staleness_read()) {
OZ(update_safe_weak_read_snapshot(standalone_stmt_desc.is_bounded_staleness_read(),
session_info,
read_snapshot_type,
standalone_stmt_desc.get_snapshot_version()));
}
}
return ret;
}
bool ObSqlTransControl::is_isolation_RR_or_SE(int32_t isolation)
{
return (isolation == ObTransIsolation::REPEATABLE_READ || isolation == ObTransIsolation::SERIALIZABLE);
}
int ObSqlTransControl::start_cursor_stmt(ObExecContext& exec_ctx)
{
int ret = OB_SUCCESS;
ObTaskExecutorCtx* executor_ctx = GET_TASK_EXECUTOR_CTX(exec_ctx);
storage::ObPartitionService* ps = nullptr;
transaction::ObTransService* txs = nullptr;
ObSQLSessionInfo* session = nullptr;
ObPhysicalPlanCtx* plan_ctx = nullptr;
CK(OB_NOT_NULL(executor_ctx));
CK(OB_NOT_NULL(ps = executor_ctx->get_partition_service()));
CK(OB_NOT_NULL(txs = ps->get_trans_service()));
CK(OB_NOT_NULL(session = exec_ctx.get_my_session()));
CK(OB_NOT_NULL(plan_ctx = GET_PHY_PLAN_CTX(exec_ctx)));
OZ(txs->start_cursor_stmt(session->get_trans_desc(), plan_ctx->get_trans_timeout_timestamp()));
return ret;
}
} // namespace sql
} // namespace oceanbase