[master][tx-route] skip check logic clock fallback for extra state

This commit is contained in:
chinaxing 2023-04-19 08:48:31 +00:00 committed by ob-robot
parent 1af73a7671
commit 4b874aff0a
2 changed files with 41 additions and 34 deletions

View File

@ -361,16 +361,21 @@ int ObTransService::txn_free_route__update_static_state(const uint32_t session_i
return ret;
}
int ObTransService::update_logic_clock_(const int64_t logic_clock)
// assumption: tx's lock is acquired
int ObTransService::update_logic_clock_(const int64_t logic_clock, const ObTxDesc *tx, const bool check_fallback)
{
// if logic clock drift too much, disconnect required
int ret = OB_SUCCESS;
int64_t one_day_us = 24L * 3600 * 1000 * 1000;
if (logic_clock - ObClockGenerator::getClock() > one_day_us ||
ObClockGenerator::getClock() - logic_clock > one_day_us) {
// bug, invalid logic_clock value
if (logic_clock - ObClockGenerator::getClock() > one_day_us) {
ret = OB_ERR_UNEXPECTED;
} else if (FALSE_IT(ObSequence::update_max_seq_no(logic_clock))) {
TRANS_LOG(WARN, "logic clock is fast more than 1 day", KR(ret), K(logic_clock));
} else if (check_fallback && (ObClockGenerator::getClock() - logic_clock > one_day_us)) {
TRANS_LOG(WARN, "logic clock is slow more than 1 day", K(logic_clock), KPC(tx));
if (OB_NOT_NULL(tx)) { tx->print_trace_(); }
}
if (OB_SUCC(ret)) {
ObSequence::update_max_seq_no(logic_clock);
}
return ret;
}
@ -409,7 +414,7 @@ int ObTransService::txn_free_route__update_dynamic_state(const uint32_t session_
TRANS_LOG(ERROR, "tx id should be valid", K(ret), KPC(tx));
} else if (OB_FAIL(decode_i64(buf, len, pos, &logic_clock))) {
TRANS_LOG(ERROR, "decode logic clock fail", K(ret));
} else if (OB_FAIL(update_logic_clock_(logic_clock))) {
} else if (OB_FAIL(update_logic_clock_(logic_clock, tx, true))) {
TRANS_LOG(ERROR, "update logic clock fail", K(ret));
} else if (OB_FAIL(tx->decode_dynamic_state(buf, len, pos))) {
TRANS_LOG(ERROR, "decode dynamic state fail", K(ret));
@ -522,7 +527,7 @@ int ObTransService::txn_free_route__update_extra_state(const uint32_t session_id
audit_record.alloc_tx_ = add_tx;
if (OB_FAIL(decode_i64(buf, len, pos, &logic_clock))) {
TRANS_LOG(ERROR, "decode logic clock fail", K(ret));
} else if (OB_FAIL(update_logic_clock_(logic_clock))) {
} else if (OB_FAIL(update_logic_clock_(logic_clock, NULL, false))) {
TRANS_LOG(ERROR, "update logic clock fail", K(ret));
}
if (OB_SUCC(ret) && add_tx && OB_FAIL(acquire_tx(tx, session_id))) {
@ -1008,8 +1013,6 @@ int ObTransService::tx_free_route_handle_push_state(const ObTxFreeRoutePushState
} else if (OB_ISNULL(tx)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "tx is null", K(ret));
} else if (OB_FAIL(update_logic_clock_(state.logic_clock_))) {
TRANS_LOG(WARN, "update logic clock fail", K(ret));
} else {
auto start_ts = ObTimeUtility::current_time();
const char *buf = state.buf_.ptr();
@ -1021,38 +1024,42 @@ int ObTransService::tx_free_route_handle_push_state(const ObTxFreeRoutePushState
int tmp_ret = OB_SUCCESS;
uint64_t flag = 0;
ObSpinLockGuard guard(tx->lock_);
tx_id = tx->tx_id_;
if (static_len > 0) {
flag |= 1 << 3;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_static_state(buf, static_len, pos))) {
TRANS_LOG(WARN, "decode static state fail", K(ret));
if (OB_FAIL(update_logic_clock_(state.logic_clock_, tx, true))) {
TRANS_LOG(WARN, "update logic clock fail", K(ret));
} else {
tx_id = tx->tx_id_;
if (static_len > 0) {
flag |= 1 << 3;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_static_state(buf, static_len, pos))) {
TRANS_LOG(WARN, "decode static state fail", K(ret));
}
ret = COVER_SUCC(tmp_ret);
}
ret = COVER_SUCC(tmp_ret);
}
if (dynamic_len > 0) {
flag |= 1 << 2;
if (dynamic_len > 0) {
flag |= 1 << 2;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_dynamic_state(buf + state.dynamic_offset_, dynamic_len, pos))) {
TRANS_LOG(WARN, "decode dynamic state fail", K(ret));
}
ret = COVER_SUCC(tmp_ret);
}
if (parts_len > 0) {
flag |= 1 << 1;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_parts_state(buf + state.parts_offset_, parts_len, pos))) {
TRANS_LOG(WARN, "decode parts state fail", K(ret));
}
ret = COVER_SUCC(tmp_ret);
}
if (extra_len > 0) {
flag |= 1 << 0;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_extra_state(buf + state.extra_offset_, extra_len, pos))) {
TRANS_LOG(WARN, "decode extra state fail", K(ret));
if (parts_len > 0) {
flag |= 1 << 1;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_parts_state(buf + state.parts_offset_, parts_len, pos))) {
TRANS_LOG(WARN, "decode parts state fail", K(ret));
}
ret = COVER_SUCC(tmp_ret);
}
if (extra_len > 0) {
flag |= 1 << 0;
int64_t pos = 0;
if (OB_TMP_FAIL(tx->decode_extra_state(buf + state.extra_offset_, extra_len, pos))) {
TRANS_LOG(WARN, "decode extra state fail", K(ret));
}
ret = COVER_SUCC(tmp_ret);
}
ret = COVER_SUCC(tmp_ret);
}
auto elapsed_us = ObTimeUtility::current_time() - start_ts;
ObTransTraceLog &tlog = tx->get_tlog();

View File

@ -13,7 +13,7 @@ int tx_free_route_handle_check_alive(const ObTxFreeRouteCheckAliveMsg &msg, cons
int tx_free_route_handle_push_state(const ObTxFreeRoutePushState &msg);
private:
int clean_txn_state_(ObTxDesc *&tx, ObTxnFreeRouteCtx &ctx, const ObTransID &tx_id);
static int update_logic_clock_(const int64_t logic_clock);
static int update_logic_clock_(const int64_t logic_clock, const ObTxDesc *tx, const bool check_fallback);
bool need_fallback_(ObTxDesc &tx, int64_t &state_size);
int push_tx_state_to_remote_(ObTxDesc &tx, const ObAddr &txn_addr);
int txn_free_route__sanity_check_fallback_(ObTxDesc *tx, ObTxnFreeRouteCtx &ctx);