[misc] remove auto keyword and lambda
This commit is contained in:
@ -75,19 +75,19 @@ using namespace share::detector;
|
||||
namespace sql
|
||||
{
|
||||
static int get_tx_service(ObBasicSessionInfo *session,
|
||||
transaction::ObTransService *&txs)
|
||||
ObTransService *&txs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto effective_tenant_id = session->get_effective_tenant_id();
|
||||
uint64_t effective_tenant_id = session->get_effective_tenant_id();
|
||||
if (OB_NOT_NULL(session->get_tx_desc())) {
|
||||
auto tx_tenant_id = session->get_tx_desc()->get_tenant_id();
|
||||
uint64_t tx_tenant_id = session->get_tx_desc()->get_tenant_id();
|
||||
if (effective_tenant_id != tx_tenant_id) {
|
||||
ret = OB_TENANT_ID_NOT_MATCH;
|
||||
LOG_ERROR("effective_tenant_id not equals to tx_tenant_id", K(ret), K(effective_tenant_id), K(tx_tenant_id), KPC(session));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (OB_ISNULL(txs = MTL_WITH_CHECK_TENANT(transaction::ObTransService*, effective_tenant_id))) {
|
||||
if (OB_ISNULL(txs = MTL_WITH_CHECK_TENANT(ObTransService*, effective_tenant_id))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("get_tx_service", K(ret), K(effective_tenant_id), K(MTL_ID()));
|
||||
}
|
||||
@ -95,12 +95,10 @@ static int get_tx_service(ObBasicSessionInfo *session,
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline int get_lock_service(uint64_t tenant_id,
|
||||
transaction::tablelock::ObTableLockService *&lock_service)
|
||||
static inline int get_lock_service(uint64_t tenant_id, tablelock::ObTableLockService *&lock_service)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
lock_service = MTL_WITH_CHECK_TENANT(transaction::tablelock::ObTableLockService*,
|
||||
tenant_id);
|
||||
lock_service = MTL_WITH_CHECK_TENANT(tablelock::ObTableLockService*, tenant_id);
|
||||
if (OB_ISNULL(lock_service)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("get_lock_service", K(ret), K(tenant_id), K(MTL_ID()));
|
||||
@ -130,7 +128,7 @@ static inline int build_tx_param_(ObSQLSessionInfo *session, ObTxParam &p, const
|
||||
int ObSqlTransControl::create_stash_savepoint(ObExecContext &ctx, const ObString &name)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
|
||||
CK (OB_NOT_NULL(session));
|
||||
OZ (get_tx_service(session, txs));
|
||||
@ -144,7 +142,7 @@ int ObSqlTransControl::explicit_start_trans(ObExecContext &ctx, const bool read_
|
||||
int ret = OB_SUCCESS;
|
||||
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(ctx);
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
uint64_t tenant_id = 0;
|
||||
ObTransID tx_id;
|
||||
bool cleanup = true;
|
||||
@ -162,7 +160,7 @@ int ObSqlTransControl::explicit_start_trans(ObExecContext &ctx, const bool read_
|
||||
|
||||
if (OB_SUCC(ret) && OB_NOT_NULL(session->get_tx_desc())) {
|
||||
ObSQLSessionInfo::LockGuard data_lock_guard(session->get_thread_data_lock());
|
||||
auto *tx_desc = session->get_tx_desc();
|
||||
ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
if (tx_desc->get_tenant_id() != tenant_id) {
|
||||
LOG_ERROR("switch tenant but hold tx_desc", K(tenant_id), KPC(tx_desc));
|
||||
}
|
||||
@ -330,12 +328,11 @@ int ObSqlTransControl::kill_query_session(ObSQLSessionInfo &session,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (session.get_in_transaction()) {
|
||||
transaction::ObTxDesc *tx_desc = session.get_tx_desc();
|
||||
auto tx_tenant_id = tx_desc->get_tenant_id();
|
||||
ObTxDesc *tx_desc = session.get_tx_desc();
|
||||
uint64_t tx_tenant_id = tx_desc->get_tenant_id();
|
||||
MTL_SWITCH(tx_tenant_id) {
|
||||
transaction::ObTransService *txs = NULL;
|
||||
CK(OB_NOT_NULL(txs = MTL_WITH_CHECK_TENANT(transaction::ObTransService*,
|
||||
tx_tenant_id)));
|
||||
ObTransService *txs = NULL;
|
||||
CK(OB_NOT_NULL(txs = MTL_WITH_CHECK_TENANT(ObTransService*, tx_tenant_id)));
|
||||
OZ(txs->interrupt(*tx_desc, OB_ERR_QUERY_INTERRUPTED),
|
||||
tx_desc->get_tx_id(), status);
|
||||
LOG_INFO("kill_query_session", K(ret), K(session), K(tx_desc->get_tx_id()),
|
||||
@ -348,7 +345,6 @@ int ObSqlTransControl::kill_query_session(ObSQLSessionInfo &session,
|
||||
int ObSqlTransControl::kill_idle_timeout_tx(ObSQLSessionInfo *session)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
using namespace oceanbase::transaction;
|
||||
if (!session->can_txn_free_route()) {
|
||||
ret = kill_tx(session, OB_TRANS_IDLE_TIMEOUT);
|
||||
}
|
||||
@ -359,23 +355,23 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (!session->get_is_deserialized() && session->is_in_transaction()) {
|
||||
auto session_id = session->get_sessid();
|
||||
uint32_t session_id = session->get_sessid();
|
||||
if (cause >= 0) {
|
||||
LOG_INFO("begin to kill tx", "caused_by", transaction::ObTxAbortCauseNames::of(cause), K(cause), K(session_id), KPC(session));
|
||||
LOG_INFO("begin to kill tx", "caused_by", ObTxAbortCauseNames::of(cause), K(cause), K(session_id), KPC(session));
|
||||
} else {
|
||||
LOG_INFO("begin to kill tx", "caused_by", common::ob_error_name(cause), K(cause), K(session_id), KPC(session));
|
||||
}
|
||||
transaction::ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
auto tx_tenant_id = tx_desc->get_tenant_id();
|
||||
ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
uint64_t tx_tenant_id = tx_desc->get_tenant_id();
|
||||
const ObTransID tx_id = tx_desc->get_tx_id();
|
||||
auto tx_free_route_tmp = session->is_txn_free_route_temp();
|
||||
bool tx_free_route_tmp = session->is_txn_free_route_temp();
|
||||
MTL_SWITCH(tx_tenant_id) {
|
||||
ObSQLSessionInfo::LockGuard data_lock_guard(session->get_thread_data_lock());
|
||||
if (tx_free_route_tmp) {
|
||||
// if XA-txn is on this server, we have acquired its ref, release ref
|
||||
// and disassocate with session
|
||||
if (tx_desc->is_xa_trans() && tx_desc->get_addr() == GCONF.self_addr_) {
|
||||
auto txs = MTL(transaction::ObTransService*);
|
||||
ObTransService *txs = MTL(ObTransService*);
|
||||
CK (OB_NOT_NULL(txs), session_id, tx_id);
|
||||
OZ (txs->release_tx_ref(*tx_desc), session_id, tx_id);
|
||||
session->get_tx_desc() = NULL;
|
||||
@ -383,7 +379,7 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause)
|
||||
} else if (tx_desc->is_xa_trans()) {
|
||||
const transaction::ObXATransID xid = session->get_xid();
|
||||
const transaction::ObGlobalTxType global_tx_type = tx_desc->get_global_tx_type(xid);
|
||||
auto xas = MTL(transaction::ObXAService *);
|
||||
ObXAService *xas = MTL(ObXAService *);
|
||||
CK (OB_NOT_NULL(xas));
|
||||
if (transaction::ObGlobalTxType::XA_TRANS == global_tx_type) {
|
||||
OZ (xas->handle_terminate_for_xa_branch(session->get_xid(), tx_desc, session->get_xa_end_timeout_seconds()),
|
||||
@ -398,8 +394,8 @@ int ObSqlTransControl::kill_tx(ObSQLSessionInfo *session, int cause)
|
||||
}
|
||||
session->get_tx_desc() = NULL;
|
||||
} else {
|
||||
transaction::ObTransService *txs = NULL;
|
||||
CK(OB_NOT_NULL(txs = MTL_WITH_CHECK_TENANT(transaction::ObTransService*, tx_tenant_id)));
|
||||
ObTransService *txs = NULL;
|
||||
CK(OB_NOT_NULL(txs = MTL_WITH_CHECK_TENANT(ObTransService*, tx_tenant_id)));
|
||||
OZ(txs->abort_tx(*tx_desc, cause), *session, tx_desc->get_tx_id());
|
||||
}
|
||||
// NOTE that the tx_desc is set to NULL in xa case, DO NOT print anything in tx_desc
|
||||
@ -437,7 +433,7 @@ int ObSqlTransControl::do_end_trans_(ObSQLSessionInfo *session,
|
||||
ObEndTransAsyncCallback *callback)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTxDesc *&tx_ptr = session->get_tx_desc();
|
||||
ObTxDesc *&tx_ptr = session->get_tx_desc();
|
||||
bool is_detector_exist = false;
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
const int64_t lcl_op_interval = GCONF._lcl_op_interval;
|
||||
@ -466,9 +462,9 @@ int ObSqlTransControl::do_end_trans_(ObSQLSessionInfo *session,
|
||||
* 1) tx will be aborted (if tx exist and not terminated)
|
||||
* 2) the callback will not been called
|
||||
*/
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
uint64_t tenant_id = session->get_effective_tenant_id();
|
||||
auto &trace_info = session->get_ob_trace_info();
|
||||
const common::ObString &trace_info = session->get_ob_trace_info();
|
||||
if (OB_FAIL(get_tx_service(session, txs))) {
|
||||
LOG_ERROR("fail to get trans service", K(ret), K(tenant_id));
|
||||
} else if (is_rollback) {
|
||||
@ -535,7 +531,7 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
|
||||
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
|
||||
const ObPhysicalPlan *plan = plan_ctx->get_phy_plan();
|
||||
ObDASCtx &das_ctx = DAS_CTX(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
uint64_t tenant_id = 0;
|
||||
CK (OB_NOT_NULL(session), OB_NOT_NULL(plan_ctx), OB_NOT_NULL(plan));
|
||||
OX (tenant_id = session->get_effective_tenant_id());
|
||||
@ -575,13 +571,14 @@ int ObSqlTransControl::start_stmt(ObExecContext &exec_ctx)
|
||||
// add snapshot info to AuditRecord
|
||||
if (OB_SUCC(ret)) {
|
||||
ObAuditRecordData &audit_record = session->get_raw_audit_record();
|
||||
auto &snapshot = das_ctx.get_snapshot();
|
||||
auto &ar_snapshot = audit_record.snapshot_;
|
||||
ar_snapshot.version_ = snapshot.core_.version_;
|
||||
ar_snapshot.tx_id_ = snapshot.core_.tx_id_.get_id();
|
||||
ar_snapshot.scn_ = snapshot.core_.scn_.cast_to_int();
|
||||
ObTxReadSnapshot &snapshot = das_ctx.get_snapshot();
|
||||
(void)snapshot.format_source_for_display(audit_record.snapshot_source_, sizeof(audit_record.snapshot_source_));
|
||||
ar_snapshot.source_ = audit_record.snapshot_source_;
|
||||
audit_record.snapshot_ = {
|
||||
.version_ = snapshot.core_.version_,
|
||||
.tx_id_ = snapshot.core_.tx_id_.get_id(),
|
||||
.scn_ = static_cast<int64_t>(snapshot.core_.scn_.cast_to_int()),
|
||||
.source_ = audit_record.snapshot_source_
|
||||
};
|
||||
audit_record.seq_num_ = ObSequence::get_max_seq_no();
|
||||
}
|
||||
if (OB_SUCC(ret) && !session->has_start_stmt()) {
|
||||
@ -615,14 +612,14 @@ bool print_log = false;
|
||||
if (print_log) {
|
||||
bool auto_commit = false;
|
||||
session->get_autocommit(auto_commit);
|
||||
auto plan_type = plan->get_location_type();
|
||||
auto stmt_type = plan->get_stmt_type();
|
||||
auto has_for_update = plan->has_for_update();
|
||||
auto use_das = plan->use_das();
|
||||
auto &trans_result = session->get_trans_result();
|
||||
auto query_start_time = session->get_query_start_time();
|
||||
auto &snapshot = das_ctx.get_snapshot();
|
||||
auto savepoint = das_ctx.get_savepoint();
|
||||
ObPhyPlanType plan_type = plan->get_location_type();
|
||||
stmt::StmtType stmt_type = plan->get_stmt_type();
|
||||
bool has_for_update = plan->has_for_update();
|
||||
bool use_das = plan->use_das();
|
||||
ObTxExecResult &trans_result = session->get_trans_result();
|
||||
int64_t query_start_time = session->get_query_start_time();
|
||||
ObTxReadSnapshot &snapshot = das_ctx.get_snapshot();
|
||||
ObTxSEQ savepoint = das_ctx.get_savepoint();
|
||||
LOG_INFO("start stmt", K(ret),
|
||||
K(auto_commit),
|
||||
K(session_id),
|
||||
@ -755,8 +752,8 @@ int ObSqlTransControl::stmt_sanity_check_(ObSQLSessionInfo *session,
|
||||
}
|
||||
|
||||
// check isolation with consistency type
|
||||
auto iso = session->get_tx_desc()->get_isolation_level();
|
||||
auto cl = plan_ctx->get_consistency_level();
|
||||
ObTxIsolationLevel iso = session->get_tx_desc()->get_isolation_level();
|
||||
ObConsistencyLevel cl = plan_ctx->get_consistency_level();
|
||||
if (ObConsistencyLevel::WEAK == cl &&
|
||||
(iso == ObTxIsolationLevel::SERIAL || iso == ObTxIsolationLevel::RR)) {
|
||||
ret = OB_NOT_SUPPORTED;
|
||||
@ -773,11 +770,11 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session,
|
||||
ObDASCtx &das_ctx,
|
||||
const ObPhysicalPlan *plan,
|
||||
const ObPhysicalPlanCtx *plan_ctx,
|
||||
transaction::ObTransService *txs)
|
||||
ObTransService *txs)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto cl = plan_ctx->get_consistency_level();
|
||||
auto &snapshot = das_ctx.get_snapshot();
|
||||
ObConsistencyLevel cl = plan_ctx->get_consistency_level();
|
||||
ObTxReadSnapshot &snapshot = das_ctx.get_snapshot();
|
||||
if (cl == ObConsistencyLevel::WEAK || cl == ObConsistencyLevel::FROZEN) {
|
||||
SCN snapshot_version = SCN::min_scn();
|
||||
const bool local_single_ls = plan->is_local_plan() &&
|
||||
@ -800,12 +797,12 @@ int ObSqlTransControl::stmt_setup_snapshot_(ObSQLSessionInfo *session,
|
||||
} else if (plan->is_plain_insert()
|
||||
&& session->get_tx_isolation() != ObTxIsolationLevel::SERIAL
|
||||
&& session->get_tx_isolation() != ObTxIsolationLevel::RR) {
|
||||
auto &tx_desc = *session->get_tx_desc();
|
||||
ObTxDesc &tx_desc = *session->get_tx_desc();
|
||||
snapshot.init_none_read();
|
||||
snapshot.core_.tx_id_ = tx_desc.get_tx_id();
|
||||
snapshot.core_.scn_ = tx_desc.get_tx_seq();
|
||||
} else {
|
||||
auto &tx_desc = *session->get_tx_desc();
|
||||
ObTxDesc &tx_desc = *session->get_tx_desc();
|
||||
int64_t stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session);
|
||||
share::ObLSID first_ls_id;
|
||||
bool local_single_ls_plan = false;
|
||||
@ -849,7 +846,7 @@ int ObSqlTransControl::stmt_refresh_snapshot(ObExecContext &exec_ctx) {
|
||||
ObDASCtx &das_ctx = DAS_CTX(exec_ctx);
|
||||
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
|
||||
const ObPhysicalPlan *plan = plan_ctx->get_phy_plan();
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
if (sql::stmt::T_INSERT == plan->get_stmt_type() || sql::stmt::T_INSERT_ALL == plan->get_stmt_type()) {
|
||||
//NOTE: oracle insert and insert all stmt can't see the evaluated results of before stmt trigger, no need to refresh snapshot
|
||||
} else if (OB_FAIL(get_tx_service(session, txs))) {
|
||||
@ -869,9 +866,9 @@ int ObSqlTransControl::set_fk_check_snapshot(ObExecContext &exec_ctx)
|
||||
const ObPhysicalPlan *plan = plan_ctx->get_phy_plan();
|
||||
// insert stmt does not set snapshot by default, set snapshopt for foreign key check induced by insert heres
|
||||
if (plan->is_plain_insert()) {
|
||||
transaction::ObTransService *txs = NULL;
|
||||
auto &snapshot = das_ctx.get_snapshot();
|
||||
auto &tx_desc = *session->get_tx_desc();
|
||||
ObTransService *txs = NULL;
|
||||
ObTxReadSnapshot &snapshot = das_ctx.get_snapshot();
|
||||
ObTxDesc &tx_desc = *session->get_tx_desc();
|
||||
int64_t stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session);
|
||||
share::ObLSID local_ls_id;
|
||||
bool local_single_ls_plan = plan->is_local_plan()
|
||||
@ -903,14 +900,14 @@ int ObSqlTransControl::set_fk_check_snapshot(ObExecContext &exec_ctx)
|
||||
int ObSqlTransControl::stmt_setup_savepoint_(ObSQLSessionInfo *session,
|
||||
ObDASCtx &das_ctx,
|
||||
ObPhysicalPlanCtx *plan_ctx,
|
||||
transaction::ObTransService* txs,
|
||||
ObTransService* txs,
|
||||
const int64_t nested_level)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObTxParam &tx_param = plan_ctx->get_trans_param();
|
||||
OZ (build_tx_param_(session, tx_param));
|
||||
auto &tx = *session->get_tx_desc();
|
||||
transaction::ObTxSEQ savepoint;
|
||||
ObTxDesc &tx = *session->get_tx_desc();
|
||||
ObTxSEQ savepoint;
|
||||
OZ (txs->create_implicit_savepoint(tx, tx_param, savepoint, nested_level == 0), tx, tx_param);
|
||||
OX (das_ctx.set_savepoint(savepoint));
|
||||
return ret;
|
||||
@ -928,7 +925,7 @@ int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
CK (OB_NOT_NULL(session));
|
||||
CHECK_SESSION (session);
|
||||
CHECK_TXN_FREE_ROUTE_ALLOWED();
|
||||
@ -972,7 +969,7 @@ int ObSqlTransControl::get_first_lsid(const ObDASCtx &das_ctx, share::ObLSID &fi
|
||||
}
|
||||
|
||||
bool ObSqlTransControl::has_same_lsid(const ObDASCtx &das_ctx,
|
||||
const transaction::ObTxReadSnapshot &snapshot,
|
||||
const ObTxReadSnapshot &snapshot,
|
||||
share::ObLSID &first_lsid)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1039,7 +1036,7 @@ bool ObSqlTransControl::has_same_lsid(const ObDASCtx &das_ctx,
|
||||
}
|
||||
|
||||
int ObSqlTransControl::start_hook_if_need_(ObSQLSessionInfo &session,
|
||||
transaction::ObTransService *txs,
|
||||
ObTransService *txs,
|
||||
bool &start_hook)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -1056,7 +1053,7 @@ int ObSqlTransControl::rollback_savepoint(ObExecContext &exec_ctx,
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
const ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
int64_t stmt_expire_ts = 0;
|
||||
|
||||
CK (OB_NOT_NULL(session), OB_NOT_NULL(plan_ctx));
|
||||
@ -1086,7 +1083,7 @@ int ObSqlTransControl::release_stash_savepoint(ObExecContext &exec_ctx,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
CK (OB_NOT_NULL(session));
|
||||
// NOTE: should _NOT_ check session is zombie, because the stash savepoint
|
||||
// should be release before query quit
|
||||
@ -1102,7 +1099,7 @@ int ObSqlTransControl::release_savepoint(ObExecContext &exec_ctx,
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
CK (OB_NOT_NULL(session));
|
||||
CHECK_SESSION (session);
|
||||
CHECK_TXN_FREE_ROUTE_ALLOWED();
|
||||
@ -1135,7 +1132,7 @@ int ObSqlTransControl::xa_rollback_all_changes(ObExecContext &exec_ctx)
|
||||
}
|
||||
CHECK_SESSION (session);
|
||||
OX (stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session));
|
||||
transaction::ObXAService * xa_service = MTL(transaction::ObXAService*);
|
||||
ObXAService * xa_service = MTL(ObXAService*);
|
||||
CK (OB_NOT_NULL(xa_service));
|
||||
OZ (xa_service->xa_rollback_all_changes(session->get_xid(),
|
||||
session->get_tx_desc(),
|
||||
@ -1149,14 +1146,14 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback)
|
||||
int ret = OB_SUCCESS;
|
||||
DISABLE_SQL_MEMLEAK_GUARD;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
auto *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
|
||||
ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
|
||||
const ObPhysicalPlan *plan = NULL;
|
||||
ObDASCtx &das_ctx = DAS_CTX(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
transaction::ObTxDesc *tx_desc = NULL;
|
||||
sql::stmt::StmtType stmt_type = sql::stmt::StmtType::T_NONE;
|
||||
ObTransService *txs = NULL;
|
||||
ObTxDesc *tx_desc = NULL;
|
||||
stmt::StmtType stmt_type = stmt::StmtType::T_NONE;
|
||||
bool is_plain_select = false;
|
||||
transaction::ObTxSEQ savepoint = das_ctx.get_savepoint();
|
||||
ObTxSEQ savepoint = das_ctx.get_savepoint();
|
||||
int exec_errcode = exec_ctx.get_errcode();
|
||||
int64_t tx_id = 0;
|
||||
|
||||
@ -1173,7 +1170,7 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback)
|
||||
OX (tx_id_before_rollback = tx_desc->get_tx_id());
|
||||
tx_id = tx_id_before_rollback.get_id();
|
||||
OX (ObTransDeadlockDetectorAdapter::maintain_deadlock_info_when_end_stmt(exec_ctx, rollback));
|
||||
auto &tx_result = session->get_trans_result();
|
||||
ObTxExecResult &tx_result = session->get_trans_result();
|
||||
if (OB_FAIL(ret)) {
|
||||
} else if (OB_E(EventTable::EN_TX_RESULT_INCOMPLETE, session->get_sessid()) tx_result.is_incomplete()) {
|
||||
if (!rollback) {
|
||||
@ -1192,8 +1189,8 @@ int ObSqlTransControl::end_stmt(ObExecContext &exec_ctx, const bool rollback)
|
||||
ret = OB_TRANS_NEED_ROLLBACK;
|
||||
LOG_WARN("direct load failed, trans aborted", KR(ret));
|
||||
} else if (rollback) {
|
||||
auto stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session);
|
||||
auto &touched_ls = tx_result.get_touched_ls();
|
||||
int64_t stmt_expire_ts = get_stmt_expire_ts(plan_ctx, *session);
|
||||
const share::ObLSArray &touched_ls = tx_result.get_touched_ls();
|
||||
OZ (txs->rollback_to_implicit_savepoint(*tx_desc, savepoint, stmt_expire_ts, &touched_ls, exec_errcode),
|
||||
savepoint, stmt_expire_ts, touched_ls);
|
||||
// prioritize returning session error code
|
||||
@ -1273,10 +1270,10 @@ bool ObSqlTransControl::is_isolation_RR_or_SE(ObTxIsolationLevel isolation)
|
||||
|| isolation == ObTxIsolationLevel::SERIAL);
|
||||
}
|
||||
|
||||
int ObSqlTransControl::create_anonymous_savepoint(ObExecContext &exec_ctx, transaction::ObTxSEQ &savepoint)
|
||||
int ObSqlTransControl::create_anonymous_savepoint(ObExecContext &exec_ctx, ObTxSEQ &savepoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
CK (OB_NOT_NULL(session));
|
||||
OZ (get_tx_service(session, txs));
|
||||
@ -1287,11 +1284,11 @@ int ObSqlTransControl::create_anonymous_savepoint(ObExecContext &exec_ctx, trans
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSqlTransControl::create_anonymous_savepoint(transaction::ObTxDesc &tx_desc, transaction::ObTxSEQ &savepoint)
|
||||
int ObSqlTransControl::create_anonymous_savepoint(ObTxDesc &tx_desc, ObTxSEQ &savepoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTransService *txs = NULL;
|
||||
if (OB_ISNULL(txs = MTL_WITH_CHECK_TENANT(transaction::ObTransService*, tx_desc.get_tenant_id()))) {
|
||||
ObTransService *txs = NULL;
|
||||
if (OB_ISNULL(txs = MTL_WITH_CHECK_TENANT(ObTransService*, tx_desc.get_tenant_id()))) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_ERROR("get_tx_service", K(ret), K(tx_desc.get_tenant_id()), K(MTL_ID()));
|
||||
}
|
||||
@ -1299,12 +1296,12 @@ int ObSqlTransControl::create_anonymous_savepoint(transaction::ObTxDesc &tx_desc
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSqlTransControl::rollback_savepoint(ObExecContext &exec_ctx, const transaction::ObTxSEQ savepoint)
|
||||
int ObSqlTransControl::rollback_savepoint(ObExecContext &exec_ctx, const ObTxSEQ savepoint)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
const ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(exec_ctx);
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
int64_t expire_ts = 0;
|
||||
|
||||
CK (OB_NOT_NULL(session), OB_NOT_NULL(plan_ctx));
|
||||
@ -1319,11 +1316,10 @@ int ObSqlTransControl::rollback_savepoint(ObExecContext &exec_ctx, const transac
|
||||
* to Transaction Manager
|
||||
* @trans_result : managed by SQL layer and maybe non-empty before pass down.
|
||||
*/
|
||||
int ObSqlTransControl::get_trans_result(ObExecContext &exec_ctx,
|
||||
transaction::ObTxExecResult &trans_result)
|
||||
int ObSqlTransControl::get_trans_result(ObExecContext &exec_ctx, ObTxExecResult &trans_result)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
ObSQLSessionInfo *session = NULL;
|
||||
CK (OB_NOT_NULL(session = exec_ctx.get_my_session()));
|
||||
OZ (get_tx_service(session, txs));
|
||||
@ -1349,11 +1345,11 @@ int ObSqlTransControl::reset_session_tx_state(ObBasicSessionInfo *session,
|
||||
LOG_DEBUG("reset session tx state", KPC(session->get_tx_desc()), K(lbt()));
|
||||
if (OB_NOT_NULL(session->get_tx_desc())) {
|
||||
ObSQLSessionInfo::LockGuard data_lock_guard(session->get_thread_data_lock());
|
||||
auto &tx_desc = *session->get_tx_desc();
|
||||
auto tx_id = tx_desc.get_tx_id();
|
||||
auto effect_tid = session->get_effective_tenant_id();
|
||||
ObTxDesc &tx_desc = *session->get_tx_desc();
|
||||
ObTransID tx_id = tx_desc.get_tx_id();
|
||||
uint64_t effect_tid = session->get_effective_tenant_id();
|
||||
MTL_SWITCH(effect_tid) {
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
OZ (get_tx_service(session, txs), *session, tx_desc);
|
||||
if (reuse_tx_desc) {
|
||||
if (OB_FAIL(txs->reuse_tx(tx_desc, data_version))) {
|
||||
@ -1376,7 +1372,7 @@ int ObSqlTransControl::reset_session_tx_state(ObSQLSessionInfo *session, bool re
|
||||
{
|
||||
int temp_ret = OB_SUCCESS;
|
||||
// cleanup txn level temp tables if this is the txn start node
|
||||
auto tx_desc = session->get_tx_desc();
|
||||
ObTxDesc *tx_desc = session->get_tx_desc();
|
||||
if (OB_NOT_NULL(tx_desc)
|
||||
&& tx_desc->with_temporary_table()
|
||||
&& tx_desc->get_addr() == GCONF.self_addr_) {
|
||||
@ -1411,7 +1407,7 @@ static int get_org_cluster_id_(ObSQLSessionInfo *session, int64_t &org_cluster_i
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSqlTransControl::acquire_tx_if_need_(transaction::ObTransService *txs, ObSQLSessionInfo &session)
|
||||
int ObSqlTransControl::acquire_tx_if_need_(ObTransService *txs, ObSQLSessionInfo &session)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (OB_ISNULL(session.get_tx_desc())) {
|
||||
@ -1500,7 +1496,7 @@ int ObSqlTransControl::lock_table(ObExecContext &exec_ctx,
|
||||
|
||||
void ObSqlTransControl::clear_xa_branch(const ObXATransID &xid, ObTxDesc *&tx_desc)
|
||||
{
|
||||
MTL(transaction::ObXAService *)->clear_xa_branch(xid, tx_desc);
|
||||
MTL(ObXAService *)->clear_xa_branch(xid, tx_desc);
|
||||
}
|
||||
|
||||
|
||||
@ -1560,11 +1556,11 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
{ \
|
||||
int ret = OB_SUCCESS; \
|
||||
ObSQLSessionInfo::LockGuard data_lock_guard(session.get_thread_data_lock()); \
|
||||
transaction::ObTransService *txs = NULL; \
|
||||
ObTransService *txs = NULL; \
|
||||
OZ (get_tx_service(&session, txs)); \
|
||||
auto &tx_desc = session.get_tx_desc(); \
|
||||
ObTxDesc *&tx_desc = session.get_tx_desc(); \
|
||||
bool has_tx_desc = OB_NOT_NULL(tx_desc); \
|
||||
transaction::ObTransID prev_tx_id; \
|
||||
ObTransID prev_tx_id; \
|
||||
if (has_tx_desc) { prev_tx_id = session.get_tx_id(); } \
|
||||
OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos, session.get_data_version()), session); \
|
||||
if (OB_SUCC(ret) && has_tx_desc && (OB_ISNULL(tx_desc) || tx_desc->get_tx_id() != prev_tx_id)) { \
|
||||
@ -1582,7 +1578,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
{ \
|
||||
int ret = OB_SUCCESS; \
|
||||
MTL_SWITCH(session.get_effective_tenant_id()) { \
|
||||
transaction::ObTransService *txs = NULL; \
|
||||
ObTransService *txs = NULL; \
|
||||
OZ (get_tx_service(&session, txs)); \
|
||||
OZ (txs->txn_free_route__serialize_##name##_state(session.get_sessid(), session.get_tx_desc(), session.get_txn_free_route_ctx(), buf, len, pos)); \
|
||||
} \
|
||||
@ -1593,7 +1589,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
{ \
|
||||
int ret = OB_SUCCESS; \
|
||||
int64_t size = -1; \
|
||||
transaction::ObTransService *txs = NULL; \
|
||||
ObTransService *txs = NULL; \
|
||||
MTL_SWITCH(session.get_effective_tenant_id()) { \
|
||||
OZ (get_tx_service(&session, txs)); \
|
||||
if (OB_SUCC(ret)) { \
|
||||
@ -1604,15 +1600,15 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
return size; \
|
||||
} \
|
||||
int64_t ObSqlTransControl::get_fetch_txn_##name##_state_size(ObSQLSessionInfo& sess) \
|
||||
{ return transaction::ObTransService::txn_free_route__get_##name##_state_size(sess.get_tx_desc()); } \
|
||||
{ return ObTransService::txn_free_route__get_##name##_state_size(sess.get_tx_desc()); } \
|
||||
int ObSqlTransControl::fetch_txn_##name##_state(ObSQLSessionInfo &sess, char *buf, const int64_t len, int64_t &pos) \
|
||||
{ return transaction::ObTransService::txn_free_route__get_##name##_state(sess.get_tx_desc(), sess.get_txn_free_route_ctx(), buf, len, pos); } \
|
||||
{ return ObTransService::txn_free_route__get_##name##_state(sess.get_tx_desc(), sess.get_txn_free_route_ctx(), buf, len, pos); } \
|
||||
int ObSqlTransControl::cmp_txn_##name##_state(const char* cur_buf, int64_t cur_len, const char* last_buf, int64_t last_len) \
|
||||
{ return transaction::ObTransService::txn_free_route__cmp_##name##_state(cur_buf, cur_len, last_buf, last_len); } \
|
||||
{ return ObTransService::txn_free_route__cmp_##name##_state(cur_buf, cur_len, last_buf, last_len); } \
|
||||
void ObSqlTransControl::display_txn_##name##_state(ObSQLSessionInfo &sess, const char* local_buf, const int64_t local_len, const char* remote_buf, const int64_t remote_len) \
|
||||
{ \
|
||||
transaction::ObTransService::txn_free_route__display_##name##_state("LOAL", local_buf, local_len); \
|
||||
transaction::ObTransService::txn_free_route__display_##name##_state("REMOTE", remote_buf, remote_len); \
|
||||
ObTransService::txn_free_route__display_##name##_state("LOAL", local_buf, local_len); \
|
||||
ObTransService::txn_free_route__display_##name##_state("REMOTE", remote_buf, remote_len); \
|
||||
ObTxDesc *tx = sess.get_tx_desc(); \
|
||||
if (OB_NOT_NULL(tx)) { \
|
||||
tx->print_trace(); \
|
||||
@ -1626,10 +1622,10 @@ DELEGATE_TO_TXN(extra);
|
||||
|
||||
#undef DELEGATE_TO_TXN
|
||||
|
||||
int ObSqlTransControl::calc_txn_free_route(ObSQLSessionInfo &session, transaction::ObTxnFreeRouteCtx &txn_free_route_ctx)
|
||||
int ObSqlTransControl::calc_txn_free_route(ObSQLSessionInfo &session, ObTxnFreeRouteCtx &txn_free_route_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
transaction::ObTransService *txs = NULL;
|
||||
ObTransService *txs = NULL;
|
||||
MTL_SWITCH(session.get_effective_tenant_id()) {
|
||||
OZ (get_tx_service(&session, txs));
|
||||
OZ (txs->calc_txn_free_route(session.get_tx_desc(), txn_free_route_ctx));
|
||||
@ -1637,13 +1633,13 @@ int ObSqlTransControl::calc_txn_free_route(ObSQLSessionInfo &session, transactio
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObSqlTransControl::check_free_route_tx_alive(ObSQLSessionInfo &session, transaction::ObTxnFreeRouteCtx &txn_free_route_ctx)
|
||||
int ObSqlTransControl::check_free_route_tx_alive(ObSQLSessionInfo &session, ObTxnFreeRouteCtx &txn_free_route_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
auto tx = session.get_tx_desc();
|
||||
ObTxDesc *tx = session.get_tx_desc();
|
||||
if (OB_NOT_NULL(tx)) {
|
||||
MTL_SWITCH(tx->get_tenant_id()) {
|
||||
transaction::ObTransService *txs = MTL(transaction::ObTransService*);
|
||||
ObTransService *txs = MTL(ObTransService*);
|
||||
CK (OB_NOT_NULL(txs));
|
||||
OZ (txs->tx_free_route_check_alive(txn_free_route_ctx, *tx, session.get_sessid()));
|
||||
}
|
||||
@ -1655,7 +1651,7 @@ int ObSqlTransControl::alloc_branch_id(ObExecContext &exec_ctx, const int64_t co
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObSQLSessionInfo *session = GET_MY_SESSION(exec_ctx);
|
||||
transaction::ObTxDesc *tx_desc = NULL;
|
||||
ObTxDesc *tx_desc = NULL;
|
||||
CK (OB_NOT_NULL(session));
|
||||
CK (OB_NOT_NULL(tx_desc = session->get_tx_desc()));
|
||||
OZ (tx_desc->alloc_branch_id(count, branch_id));
|
||||
|
||||
Reference in New Issue
Block a user