[master] misc fix and refine
This commit is contained in:
@ -906,7 +906,7 @@ OB_INLINE int ObMPQuery::do_process(ObSQLSessionInfo &session,
|
||||
if (OB_ERR_PROXY_REROUTE == ret) {
|
||||
LOG_DEBUG("query should be rerouted", K(ret), K(async_resp_used));
|
||||
} else {
|
||||
LOG_WARN("query failed", K(ret), K(retry_ctrl_.need_retry()));
|
||||
LOG_WARN("query failed", K(ret), K(session), K(sql), K(retry_ctrl_.need_retry()));
|
||||
}
|
||||
// 当need_retry=false时,可能给客户端回过包了,可能还没有回过任何包。
|
||||
// 不过,可以确定:这个请求出错了,还没处理完。如果不是已经交给异步EndTrans收尾,
|
||||
|
||||
@ -157,7 +157,8 @@ void ObSql::stat()
|
||||
|| (allow_ps && stmt_type == stmt::StmtType::T_DEALLOCATE)) { \
|
||||
} else { \
|
||||
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; \
|
||||
LOG_WARN("only DML stmt or SET command is supported to be executed on txn temporary node", KR(ret), K(stmt_type)); \
|
||||
LOG_WARN("only DML stmt or SET command is supported to be executed on txn temporary node", \
|
||||
KR(ret), K(stmt_type), K(session.get_txn_free_route_ctx()), K(session)); \
|
||||
} \
|
||||
} \
|
||||
}
|
||||
|
||||
@ -48,11 +48,14 @@
|
||||
LOG_WARN("session has been killed", KR(ret), KPC(session)); \
|
||||
}
|
||||
#endif
|
||||
#define CHECK_TX_FREE_ROUTE(session, ...) \
|
||||
#define CHECK_TX_FREE_ROUTE(exec_ctx, session, ...) \
|
||||
if (OB_SUCC(ret) && session->is_txn_free_route_temp()) { \
|
||||
__VA_ARGS__; \
|
||||
ret = OB_ERR_UNEXPECTED; \
|
||||
TRANS_LOG(ERROR, "trans act on txn temporary node", KR(ret), K(session->get_tx_id()), KPC(session)); \
|
||||
exec_ctx.set_need_disconnect(true); \
|
||||
TRANS_LOG(ERROR, "trans act on txn temporary node", KR(ret), \
|
||||
K(session->get_txn_free_route_ctx()), \
|
||||
K(session->get_tx_id()), KPC(session)); \
|
||||
}
|
||||
|
||||
namespace oceanbase
|
||||
@ -141,7 +144,7 @@ int ObSqlTransControl::explicit_start_trans(ObExecContext &ctx, const bool read_
|
||||
|
||||
CK (OB_NOT_NULL(plan_ctx), OB_NOT_NULL(session));
|
||||
CHECK_SESSION(session);
|
||||
CHECK_TX_FREE_ROUTE(session, cleanup = false);
|
||||
CHECK_TX_FREE_ROUTE(ctx, session, cleanup = false);
|
||||
if (OB_SUCC(ret) && session->is_in_transaction()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
cleanup = false;
|
||||
@ -197,7 +200,7 @@ int ObSqlTransControl::implicit_end_trans(ObExecContext &exec_ctx,
|
||||
CK (OB_NOT_NULL(session));
|
||||
int64_t tx_id = 0;
|
||||
OX (tx_id = session->get_tx_id().get_id());
|
||||
CHECK_TX_FREE_ROUTE(session);
|
||||
CHECK_TX_FREE_ROUTE(exec_ctx, session);
|
||||
FLTSpanGuard(end_transaction);
|
||||
OZ(end_trans(exec_ctx, is_rollback, false, callback));
|
||||
FLT_SET_TAG(trans_id, tx_id);
|
||||
@ -218,7 +221,7 @@ int ObSqlTransControl::explicit_end_trans(ObExecContext &exec_ctx, const bool is
|
||||
if (OB_SUCC(ret) && session->get_tx_desc()) {
|
||||
txn_id = session->get_tx_desc()->tid();
|
||||
}
|
||||
CHECK_TX_FREE_ROUTE(session);
|
||||
CHECK_TX_FREE_ROUTE(exec_ctx, session);
|
||||
if (exec_ctx.is_end_trans_async()) {
|
||||
CK (OB_NOT_NULL(callback = &session->get_end_trans_cb()));
|
||||
}
|
||||
@ -684,7 +687,8 @@ int ObSqlTransControl::stmt_setup_savepoint_(ObSQLSessionInfo *session,
|
||||
#define CHECK_TXN_FREE_ROUTE_ALLOWED() \
|
||||
if (OB_SUCC(ret) && !session->is_inner() && session->is_txn_free_route_temp()) { \
|
||||
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED; \
|
||||
LOG_WARN("current stmt is not allowed executed on txn tmp node", K(ret), KPC(session)); \
|
||||
LOG_WARN("current stmt is not allowed executed on txn tmp node", K(ret), \
|
||||
K(session->get_txn_free_route_ctx()), KPC(session)); \
|
||||
}
|
||||
int ObSqlTransControl::create_savepoint(ObExecContext &exec_ctx,
|
||||
const ObString &sp_name)
|
||||
@ -1079,7 +1083,7 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
bool has_tx_desc = OB_NOT_NULL(tx_desc); \
|
||||
transaction::ObTransID prev_tx_id; \
|
||||
if (has_tx_desc) { prev_tx_id = session.get_tx_id(); } \
|
||||
OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos)); \
|
||||
OZ (txs->txn_free_route__update_##name##_state(session.get_sessid(), tx_desc, session.get_txn_free_route_ctx(), buf, len, pos), session); \
|
||||
if (OB_SUCC(ret) && has_tx_desc && (OB_ISNULL(tx_desc) || tx_desc->get_tx_id() != prev_tx_id)) { \
|
||||
session.reset_tx_variable(); \
|
||||
} \
|
||||
@ -1104,8 +1108,10 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
|
||||
transaction::ObTransService *txs = NULL; \
|
||||
MTL_SWITCH(session.get_effective_tenant_id()) { \
|
||||
OZ (get_tx_service(&session, txs)); \
|
||||
if (OB_SUCC(ret)) { \
|
||||
size = txs->txn_free_route__get_##name##_state_serialize_size(session.get_tx_desc(), session.get_txn_free_route_ctx()); \
|
||||
} \
|
||||
} \
|
||||
LOG_DEBUG("get-serialize-size-txn-state", K(session)); \
|
||||
return size; \
|
||||
}
|
||||
|
||||
@ -1437,6 +1437,7 @@ int ObTransService::build_tx_commit_msg_(const ObTxDesc &tx, ObTxCommitMsg &msg)
|
||||
msg.sender_addr_ = self_;
|
||||
msg.sender_ = share::SCHEDULER_LS;
|
||||
msg.cluster_id_ = tx.cluster_id_;
|
||||
msg.app_trace_info_ = tx.trace_info_.get_app_trace_info();
|
||||
msg.request_id_ = tx.op_sn_;
|
||||
if (OB_FAIL(msg.parts_.assign(tx.commit_parts_))) {
|
||||
TRANS_LOG(WARN, "assign parts fail", K(ret), K(tx));
|
||||
|
||||
@ -150,7 +150,7 @@ int ObTransService::release_tx(ObTxDesc &tx)
|
||||
#endif
|
||||
tx_desc_mgr_.revert(tx);
|
||||
} else {
|
||||
finalize_tx_(tx);
|
||||
ret = finalize_tx_(tx);
|
||||
tx_desc_mgr_.revert(tx);
|
||||
}
|
||||
}
|
||||
|
||||
@ -82,20 +82,29 @@ int ObTransService::clean_txn_state_(ObTxDesc *&tx, const ObTransID &tx_id)
|
||||
//
|
||||
// if such insanity happened, it may the proxy was corrupted, we should disconnect
|
||||
// with proxy, because such error can not been repaired via retry
|
||||
bool release_ref = false, release = false;
|
||||
{
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
if (TX_START_OR_RESUME_LOCAL(tx) && tx->tx_id_ == tx_id) {
|
||||
if (TX_START_OR_RESUME_LOCAL(tx)) {
|
||||
if (tx->tx_id_ == tx_id) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "!bug, try to clean txn state on txn start node", K(ret), KPC(tx));
|
||||
TRANS_LOG(ERROR, "try to clean txn state on txn start node", K(ret), KPC(tx));
|
||||
} else if (tx->is_in_tx()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "try to clean txn state while tx is active", K(ret), KPC(tx), K(tx_id));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
if (tx->is_xa_trans() && tx->addr_ == self_) {
|
||||
// on XA orignal, release ref
|
||||
release_tx_ref(*tx);
|
||||
release_ref = true;
|
||||
} else {
|
||||
release_tx(*tx);
|
||||
release = true;
|
||||
}
|
||||
tx = NULL;
|
||||
}
|
||||
}
|
||||
if (release_ref) { release_tx_ref(*tx); tx = NULL; }
|
||||
if (release) { ret = release_tx(*tx); tx = NULL; }
|
||||
#ifndef NDEBUG
|
||||
TRANS_LOG(INFO, "[tx free route] clean-txn-state", K(ret));
|
||||
#endif
|
||||
@ -202,7 +211,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
audit_record.upd_term_ = true;
|
||||
audit_record.upd_clean_tx_ = OB_NOT_NULL(tx);
|
||||
if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, tx_id))) {
|
||||
TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx));
|
||||
TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx_id), K(tx));
|
||||
}
|
||||
} else if (flag.is_fallback_) {
|
||||
audit_record.upd_fallback_ = true;
|
||||
@ -490,7 +499,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
}
|
||||
if (OB_FAIL(ret) && OB_NOT_NULL(tx)) {
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
TRANS_LOG(WARN, "update state fail", K(ret), KPC(tx));
|
||||
TRANS_LOG(WARN, "update state fail", K(ret), K(session_id), KPC(tx));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -20,8 +20,8 @@ int ObTxFreeRouteCheckAliveP::process()
|
||||
transaction::ObTransID sess_tx_id;
|
||||
{
|
||||
sql::ObSQLSessionInfo *session = NULL;
|
||||
auto session_mgr = GCTX.session_mgr_;
|
||||
if (OB_FAIL(session_mgr->get_session(arg_.tx_sess_id_, session))) {
|
||||
sql::ObSessionGetterGuard guard(*GCTX.session_mgr_, arg_.tx_sess_id_);
|
||||
if (OB_FAIL(guard.get_session(session))) {
|
||||
if (OB_ENTRY_NOT_EXIST == ret) {
|
||||
ret = OB_SESSION_NOT_FOUND;
|
||||
} else {
|
||||
|
||||
Reference in New Issue
Block a user