exchange some of free route trans_log with flog
This commit is contained in:
@ -9,6 +9,7 @@
|
||||
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
* See the Mulan PubL v2 for more details.
|
||||
*/
|
||||
#define USING_LOG_PREFIX TRANS
|
||||
|
||||
#include "ob_trans_service.h"
|
||||
#include "lib/utility/serialization.h"
|
||||
@ -107,10 +108,10 @@ int ObTransService::clean_txn_state_(ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, cons
|
||||
if (ctx.txn_addr_ == self_) {
|
||||
if (tx->tx_id_ == tx_id) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "try to clean txn state on txn start node", K(ret), KPC(tx));
|
||||
FLOG_ERROR("try to clean txn state on txn start node", K(ret), KPC(tx));
|
||||
} else if (tx->is_in_tx()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "try to clean txn state while tx is active", K(ret), KPC(tx), K(tx_id));
|
||||
FLOG_ERROR("try to clean txn state while tx is active", K(ret), KPC(tx), K(tx_id));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -143,7 +144,7 @@ int ObTransService::txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFr
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
if (tx->addr_ != self_ && tx->xa_start_addr_ != self_) {
|
||||
ret = OB_TRANS_FREE_ROUTE_NOT_SUPPORTED;
|
||||
TRANS_LOG(ERROR, "tmp node receive fallback notify packet", K(ret), KPC(tx));
|
||||
FLOG_ERROR("tmp node receive fallback notify packet", K(ret), KPC(tx));
|
||||
} else if (!ctx.is_fallbacked_) {
|
||||
// if we receive an fallback flag from temporary-txn-node, set the ctx's fallbacked
|
||||
// indicate current txn has been fallbacked (by temporary-txn-node)
|
||||
@ -289,7 +290,7 @@ static int process_header_(TxStateHeader &header,
|
||||
} else if (OB_FAIL(header.decode(buf, len, pos))) {
|
||||
TRANS_LOG(ERROR, "decode header fail", K(ret));
|
||||
} else if (OB_FAIL(ctx.state_update_verify_by_version(cur_state, header.global_version_, header.backend_sess_id_, dup_sync))) {
|
||||
TRANS_LOG(WARN, "version verify failed", K(ret), K(header));
|
||||
FLOG_WARN("version verify failed", K(ret), K(header));
|
||||
} else if (!header.tx_id_.is_valid()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx id is invalid", K(ret));
|
||||
@ -350,7 +351,7 @@ int ObTransService::txn_free_route__handle_tx_exist_(const ObTransID &tx_id, ObT
|
||||
// some session hold this txn already, close the session and release this txn
|
||||
// then continue with retry
|
||||
uint32_t assoc_sess_id = tmp_tx->assoc_sess_id_;
|
||||
TRANS_LOG(WARN, "tx found associate with other session, will kill the session",
|
||||
FLOG_WARN("tx found associate with other session, will kill the session",
|
||||
K(assoc_sess_id), K(tx_id));
|
||||
if (OB_FAIL(txn_free_route__kill_session_(assoc_sess_id))) {
|
||||
TRANS_LOG(WARN, "kill old session failed", K(ret), K(assoc_sess_id));
|
||||
@ -407,7 +408,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
audit_record.upd_term_ = true;
|
||||
audit_record.upd_clean_tx_ = OB_NOT_NULL(tx);
|
||||
if (OB_NOT_NULL(tx) && OB_FAIL(clean_txn_state_(tx, ctx, header.tx_id_))) {
|
||||
TRANS_LOG(WARN, "cleanup prev txn state fail", K(ret), K(tx));
|
||||
FLOG_WARN("cleanup prev txn state fail", K(ret), K(tx));
|
||||
}
|
||||
} else if (header.flag_.is_fallback()) {
|
||||
audit_record.upd_fallback_ = true;
|
||||
@ -438,7 +439,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
if (tx->state_ != ObTxDesc::State::IDLE && !tx->is_xa_trans()) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
TRANS_LOG(ERROR, "txn static update must with IDLE state", K(ret), K(session_id), KPC(tx));
|
||||
FLOG_ERROR("txn static update must with IDLE state", K(ret), K(session_id), KPC(tx));
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
@ -451,7 +452,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
// receive static state which was born in this node, and its session is not current session
|
||||
// this can only happened for XA which xa_start on remote
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "receive tx static-state born in this node of another session", K(ret),
|
||||
FLOG_ERROR("receive tx static-state born in this node of another session", K(ret),
|
||||
K(tx->sess_id_), K(session_id), KPC(tx));
|
||||
} else if (need_add_tx && !tx->is_xa_trans()
|
||||
&& OB_FAIL(tx_desc_mgr_.add_with_txid(tx->tx_id_, *tx))) {
|
||||
@ -481,7 +482,7 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
|
||||
ctx.update_last_synced_state(TxnFreeRouteState::STATIC, header.backend_sess_id_, header.global_version_);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(before_tx_id),
|
||||
FLOG_WARN("[tx-free-route::update_state]", K(ret), K(header), K(before_tx_id),
|
||||
K(session_id), K(ctx), KP(tx));
|
||||
}
|
||||
return ret;
|
||||
@ -523,21 +524,21 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_
|
||||
audit_record.upd_term_ = true;
|
||||
if (OB_NOT_NULL(tx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx should be null: released in static state update", K(ret), K(tx->tx_id_));
|
||||
FLOG_ERROR("tx should be null: released in static state update", K(ret), K(tx->tx_id_));
|
||||
}
|
||||
} else if (header.flag_.is_fallback()) {
|
||||
audit_record.upd_fallback_ = true;
|
||||
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
|
||||
} else if (OB_ISNULL(tx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx should not be null", K(ret), K(session_id));
|
||||
FLOG_ERROR("tx should not be null", K(ret), K(session_id));
|
||||
} else {
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
if (!tx->tx_id_.is_valid()) {
|
||||
// bug, dynamic state exist, txn should be active
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx id should be valid", K(ret), KPC(tx));
|
||||
FLOG_ERROR("tx id should be valid", K(ret), KPC(tx));
|
||||
} else if (OB_FAIL(decode_i64(buf, len, pos, &logic_clock))) {
|
||||
TRANS_LOG(ERROR, "decode logic clock fail", K(ret));
|
||||
} else if (OB_FAIL(update_logic_clock_(logic_clock, tx, true))) {
|
||||
@ -559,7 +560,7 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_
|
||||
ctx.update_last_synced_state(TxnFreeRouteState::DYNAMIC, header.backend_sess_id_, header.global_version_);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(logic_clock),
|
||||
FLOG_WARN("[tx-free-route::update_state]", K(ret), K(header), K(logic_clock),
|
||||
K(session_id), K(ctx), KP(tx));
|
||||
}
|
||||
return ret;
|
||||
@ -593,14 +594,14 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id
|
||||
ret = txn_free_route__sanity_check_fallback_(tx, ctx);
|
||||
} else if (OB_ISNULL(tx)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx should not be null", K(ret), K(session_id));
|
||||
FLOG_ERROR("tx should not be null", K(ret), K(session_id));
|
||||
} else {
|
||||
int64_t start_ts = ObTimeUtility::current_time();
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
if (!tx->tx_id_.is_valid()) {
|
||||
// bug, dynamic state exist, txn should be active
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "tx id should be active", K(ret), K(tx->tx_id_));
|
||||
FLOG_ERROR("tx id should be active", K(ret), K(tx->tx_id_));
|
||||
} else if (OB_FAIL(tx->decode_parts_state(buf, len, pos))) {
|
||||
TRANS_LOG(WARN, "decode participants fail", K(ret));
|
||||
}
|
||||
@ -617,7 +618,7 @@ int ObTransService::txn_free_route__update_parts_state(const uint32_t session_id
|
||||
ctx.update_last_synced_state(TxnFreeRouteState::PARTICIPANT, header.backend_sess_id_, header.global_version_);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(session_id), K(ctx), KP(tx));
|
||||
FLOG_WARN("[tx-free-route::update_state]", K(ret), K(header), K(session_id), K(ctx), KP(tx));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -670,7 +671,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
if (OB_SUCC(ret) && replace_tx && tx->tx_id_.is_valid()) {
|
||||
if (OB_UNLIKELY(tx->in_tx_for_free_route())) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "try overwrite tx which is active", K(ret), K(tx->tx_id_));
|
||||
FLOG_ERROR("try overwrite tx which is active", K(ret), K(tx->tx_id_));
|
||||
} else if (OB_FAIL(tx_desc_mgr_.remove(*tx))) {
|
||||
TRANS_LOG(WARN, "unregister old tx fail", K(ret), K(tx->tx_id_));
|
||||
}
|
||||
@ -707,11 +708,11 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
ctx.update_last_synced_state(TxnFreeRouteState::EXTRA, header.backend_sess_id_, header.global_version_);
|
||||
}
|
||||
if (OB_FAIL(ret)) {
|
||||
TRANS_LOG(WARN, "[tx-free-route::update_state]", K(ret), K(header), K(logic_clock),
|
||||
FLOG_WARN("[tx-free-route::update_state]", K(ret), K(header), K(logic_clock),
|
||||
K(session_id), K(ctx), KP(tx));
|
||||
if (OB_NOT_NULL(tx)) {
|
||||
ObSpinLockGuard guard(tx->lock_);
|
||||
TRANS_LOG(INFO, "dump tx", KPC(tx));
|
||||
FLOG_INFO("dump tx", KPC(tx));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
@ -737,7 +738,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
|
||||
if (OB_SUCC(ret)) { \
|
||||
ObSpinLockGuard guard(tx->lock_); \
|
||||
if (OB_FAIL(tx->encode_##X##_state(buf, len, pos))) { \
|
||||
TRANS_LOG(WARN, "encode state fail", K(ret), KPC(tx)); \
|
||||
FLOG_WARN("encode state fail", K(ret), KPC(tx)); \
|
||||
} \
|
||||
} \
|
||||
}
|
||||
@ -942,7 +943,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
||||
ret = OB_TRANS_KILLED;
|
||||
} else {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "txn terminated not on txn start node", K(ret), K_(ctx.txn_addr), K_(self));
|
||||
FLOG_ERROR("txn terminated not on txn start node", K(ret), K_(ctx.txn_addr), K_(self));
|
||||
}
|
||||
} else if (ctx.can_free_route_) {
|
||||
return_terminated_state = true;
|
||||
@ -1009,7 +1010,7 @@ int ObTransService::calc_txn_free_route(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx)
|
||||
changed |= (ctx.extra_changed_ = tx->is_extra_changed());
|
||||
if (is_tx_start && !changed) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
TRANS_LOG(ERROR, "txn start but none state changed", K(ret), KPC(tx), K(ctx));
|
||||
FLOG_ERROR("txn start but none state changed", K(ret), KPC(tx), K(ctx));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user