diff --git a/src/storage/tx/ob_tx_free_route.cpp b/src/storage/tx/ob_tx_free_route.cpp index 6d500c94f1..cf18e11aee 100644 --- a/src/storage/tx/ob_tx_free_route.cpp +++ b/src/storage/tx/ob_tx_free_route.cpp @@ -361,16 +361,21 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i return ret; } -int ObTransService::update_logic_clock_(const int64_t logic_clock) +// assumption: tx's lock is acquired +int ObTransService::update_logic_clock_(const int64_t logic_clock, const ObTxDesc *tx, const bool check_fallback) { // if logic clock drift too much, disconnect required int ret = OB_SUCCESS; int64_t one_day_us = 24L * 3600 * 1000 * 1000; - if (logic_clock - ObClockGenerator::getClock() > one_day_us || - ObClockGenerator::getClock() - logic_clock > one_day_us) { - // bug, invalid logic_clock value + if (logic_clock - ObClockGenerator::getClock() > one_day_us) { ret = OB_ERR_UNEXPECTED; - } else if (FALSE_IT(ObSequence::update_max_seq_no(logic_clock))) { + TRANS_LOG(WARN, "logic clock is fast more than 1 day", KR(ret), K(logic_clock)); + } else if (check_fallback && (ObClockGenerator::getClock() - logic_clock > one_day_us)) { + TRANS_LOG(WARN, "logic clock is slow more than 1 day", K(logic_clock), KPC(tx)); + if (OB_NOT_NULL(tx)) { tx->print_trace_(); } + } + if (OB_SUCC(ret)) { + ObSequence::update_max_seq_no(logic_clock); } return ret; } @@ -409,7 +414,7 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_ TRANS_LOG(ERROR, "tx id should be valid", K(ret), KPC(tx)); } else if (OB_FAIL(decode_i64(buf, len, pos, &logic_clock))) { TRANS_LOG(ERROR, "decode logic clock fail", K(ret)); - } else if (OB_FAIL(update_logic_clock_(logic_clock))) { + } else if (OB_FAIL(update_logic_clock_(logic_clock, tx, true))) { TRANS_LOG(ERROR, "update logic clock fail", K(ret)); } else if (OB_FAIL(tx->decode_dynamic_state(buf, len, pos))) { TRANS_LOG(ERROR, "decode dynamic state fail", K(ret)); @@ -522,7 +527,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id audit_record.alloc_tx_ = add_tx; if (OB_FAIL(decode_i64(buf, len, pos, &logic_clock))) { TRANS_LOG(ERROR, "decode logic clock fail", K(ret)); - } else if (OB_FAIL(update_logic_clock_(logic_clock))) { + } else if (OB_FAIL(update_logic_clock_(logic_clock, NULL, false))) { TRANS_LOG(ERROR, "update logic clock fail", K(ret)); } if (OB_SUCC(ret) && add_tx && OB_FAIL(acquire_tx(tx, session_id))) { @@ -1008,8 +1013,6 @@ int ObTransService::tx_free_route_handle_push_state(const ObTxFreeRoutePushState } else if (OB_ISNULL(tx)) { ret = OB_ERR_UNEXPECTED; TRANS_LOG(WARN, "tx is null", K(ret)); - } else if (OB_FAIL(update_logic_clock_(state.logic_clock_))) { - TRANS_LOG(WARN, "update logic clock fail", K(ret)); } else { auto start_ts = ObTimeUtility::current_time(); const char *buf = state.buf_.ptr(); @@ -1021,38 +1024,42 @@ int ObTransService::tx_free_route_handle_push_state(const ObTxFreeRoutePushState int tmp_ret = OB_SUCCESS; uint64_t flag = 0; ObSpinLockGuard guard(tx->lock_); - tx_id = tx->tx_id_; - if (static_len > 0) { - flag |= 1 << 3; - int64_t pos = 0; - if (OB_TMP_FAIL(tx->decode_static_state(buf, static_len, pos))) { - TRANS_LOG(WARN, "decode static state fail", K(ret)); + if (OB_FAIL(update_logic_clock_(state.logic_clock_, tx, true))) { + TRANS_LOG(WARN, "update logic clock fail", K(ret)); + } else { + tx_id = tx->tx_id_; + if (static_len > 0) { + flag |= 1 << 3; + int64_t pos = 0; + if (OB_TMP_FAIL(tx->decode_static_state(buf, static_len, pos))) { + TRANS_LOG(WARN, "decode static state fail", K(ret)); + } + ret = COVER_SUCC(tmp_ret); } - ret = COVER_SUCC(tmp_ret); - } - if (dynamic_len > 0) { - flag |= 1 << 2; + if (dynamic_len > 0) { + flag |= 1 << 2; int64_t pos = 0; if (OB_TMP_FAIL(tx->decode_dynamic_state(buf + state.dynamic_offset_, dynamic_len, pos))) { TRANS_LOG(WARN, "decode dynamic state fail", K(ret)); } ret = COVER_SUCC(tmp_ret); - } - if (parts_len > 0) { - flag |= 1 << 1; - int64_t pos = 0; - if (OB_TMP_FAIL(tx->decode_parts_state(buf + state.parts_offset_, parts_len, pos))) { - TRANS_LOG(WARN, "decode parts state fail", K(ret)); } - ret = COVER_SUCC(tmp_ret); - } - if (extra_len > 0) { - flag |= 1 << 0; - int64_t pos = 0; - if (OB_TMP_FAIL(tx->decode_extra_state(buf + state.extra_offset_, extra_len, pos))) { - TRANS_LOG(WARN, "decode extra state fail", K(ret)); + if (parts_len > 0) { + flag |= 1 << 1; + int64_t pos = 0; + if (OB_TMP_FAIL(tx->decode_parts_state(buf + state.parts_offset_, parts_len, pos))) { + TRANS_LOG(WARN, "decode parts state fail", K(ret)); + } + ret = COVER_SUCC(tmp_ret); + } + if (extra_len > 0) { + flag |= 1 << 0; + int64_t pos = 0; + if (OB_TMP_FAIL(tx->decode_extra_state(buf + state.extra_offset_, extra_len, pos))) { + TRANS_LOG(WARN, "decode extra state fail", K(ret)); + } + ret = COVER_SUCC(tmp_ret); } - ret = COVER_SUCC(tmp_ret); } auto elapsed_us = ObTimeUtility::current_time() - start_ts; ObTransTraceLog &tlog = tx->get_tlog(); diff --git a/src/storage/tx/ob_tx_free_route_api.h b/src/storage/tx/ob_tx_free_route_api.h index f68676d477..7f65c6efec 100644 --- a/src/storage/tx/ob_tx_free_route_api.h +++ b/src/storage/tx/ob_tx_free_route_api.h @@ -13,7 +13,7 @@ int tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg, cons int tx_free_route_handle_push_state(const ObTxFreeRoutePushState &msg); private: int clean_txn_state_(ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const ObTransID &tx_id); -static int update_logic_clock_(const int64_t logic_clock); +static int update_logic_clock_(const int64_t logic_clock, const ObTxDesc *tx, const bool check_fallback); bool need_fallback_(ObTxDesc &tx, int64_t &state_size); int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr); int txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx);