From 12e315115d1cbb958267e7d8416beb6f26bdf3fa Mon Sep 17 00:00:00 2001 From: zh0 Date: Wed, 28 Jul 2021 15:36:15 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A7=A3=E5=86=B3=E4=BA=8B=E5=8A=A1=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E9=87=8D=E6=9E=84=E5=BC=95=E5=85=A5=E7=9A=84=E7=8A=B6?= =?UTF-8?q?=E6=80=81=E6=9C=BA=E6=97=A0=E6=B3=95=E7=BB=93=E6=9D=9F=E7=9A=84?= =?UTF-8?q?=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/storage/transaction/ob_trans_ctx.cpp | 3 +- src/storage/transaction/ob_trans_ctx.h | 3 +- src/storage/transaction/ob_trans_part_ctx.cpp | 30 +++++++++++-------- src/storage/transaction/ob_trans_service.cpp | 3 ++ 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/storage/transaction/ob_trans_ctx.cpp b/src/storage/transaction/ob_trans_ctx.cpp index 4cc05ac06..936e05163 100644 --- a/src/storage/transaction/ob_trans_ctx.cpp +++ b/src/storage/transaction/ob_trans_ctx.cpp @@ -651,7 +651,8 @@ int ObDistTransCtx::init(const uint64_t tenant_id, const ObTransID& trans_id, co TRANS_LOG(WARN, "ObTransCtx inited error", KR(ret)); } else { set_state_(Ob2PCState::INIT); - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = + std::min((int64_t)ObServerConfig::get_instance().trx_2pc_retry_interval, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } return ret; diff --git a/src/storage/transaction/ob_trans_ctx.h b/src/storage/transaction/ob_trans_ctx.h index 57d442a60..d7e216de2 100644 --- a/src/storage/transaction/ob_trans_ctx.h +++ b/src/storage/transaction/ob_trans_ctx.h @@ -491,9 +491,8 @@ public: // then the total size of trans_ctx preallocated by resource pool is 200B * 100 * 1000 = 20MB. // Taking the concurrency num into consideration, obviously, it is appropriate. static const int64_t RP_TOTAL_NUM = 100 * 1000; - -protected: static const int64_t MAX_TRANS_2PC_TIMEOUT_US = 3 * 1000 * 1000; // 3s +protected: // if 600 seconds after trans timeout, warn is required static const int64_t OB_TRANS_WARN_USE_TIME = 600 * 1000 * 1000; // 0x0078746365657266 means freectx diff --git a/src/storage/transaction/ob_trans_part_ctx.cpp b/src/storage/transaction/ob_trans_part_ctx.cpp index 9978ce9bf..981b1096b 100644 --- a/src/storage/transaction/ob_trans_part_ctx.cpp +++ b/src/storage/transaction/ob_trans_part_ctx.cpp @@ -1238,6 +1238,7 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) } else { set_stc_by_now_(); } + const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; // TODO do not set scheduler/coordinator/participants if state is init if (OB_FAIL(set_app_trace_info_(msg.get_app_trace_info()))) { TRANS_LOG(WARN, "set app trace info error", K(ret), K(msg), K(*this)); @@ -1252,9 +1253,9 @@ int ObPartTransCtx::handle_message(const ObTransMsg& msg) } else if (OB_FAIL(handle_2pc_prepare_request_(msg))) { TRANS_LOG(WARN, "handle 2pc preprare request error", KR(ret), "context", *this, K(msg)); } else if (OB_FAIL(unregister_timeout_task_())) { - TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this); - } else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) { - TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this); + TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this); + } else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) { + TRANS_LOG(WARN, "register timeout handler error", K(ret), "context", *this); } else { // do nothing } @@ -2531,19 +2532,20 @@ int ObPartTransCtx::leader_active(const LeaderActiveArg& arg) // The request_id_ should be initialized to prevent the 2pc cannot be // driven if all participants transferring the leader generate_request_id_(); + trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; if (Ob2PCState::INIT == get_state_()) { const int64_t left_time = trans_expired_time_ - ObClockGenerator::getRealClock(); if (left_time > 0) { trans_2pc_timeout_ = left_time; } else { - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } // The XA txn has replayed the last redo log if (is_xa_local_trans() && is_redo_prepared_) { - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } } else { - trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); } // do not post transaction message to avoid deadlock, bug#8257026 // just register timeout task @@ -2739,14 +2741,15 @@ int ObPartTransCtx::commit(const bool is_rollback, sql::ObIEndTransCallback* cb, // - If the txn shouldnot keep the dependency with the predecessors, rollback the txn immediately // - If the dependecy is necessary, write the OB_LOG_SP_ELR_TRANS_COMMIT // - Current implementation write the OB_LOG_SP_ELR_TRANS_COMMIT if the predecessors exit + const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; if (OB_FAIL(set_app_trace_info_(app_trace_info))) { TRANS_LOG(WARN, "set app trace info error", K(ret), K(app_trace_info), K(*this)); } else if (OB_FAIL(generate_sp_commit_log_type_(log_type))) { TRANS_LOG(WARN, "generate sp commit log type error", K(ret), "context", *this); } else if (OB_FAIL(unregister_timeout_task_())) { - TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this); + TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this); } else if (OB_FAIL(register_timeout_task_( - ObServerConfig::get_instance().trx_2pc_retry_interval + trans_id_.hash() % usec_per_sec))) { + std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US) + trans_id_.hash() % usec_per_sec))) { TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this); } else if (OB_FAIL(submit_log_async_(log_type, has_redo_log))) { TRANS_LOG(WARN, "submit sp log error", KR(ret), "context", *this); @@ -6144,10 +6147,11 @@ int ObPartTransCtx::generate_redo_prepare_log_info(char* buf, const int64_t size // Record the commit version of the txn. If you can elr later, you // need to record it in the trans_result_info_mgr global_trans_version_ = commit_version; + int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; if (OB_FAIL(unregister_timeout_task_())) { - TRANS_LOG(WARN, "unregister timeout handler error", KR(ret), "context", *this); - } else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) { - TRANS_LOG(WARN, "register timeout handler error", KR(ret), "context", *this); + TRANS_LOG(WARN, "unregister timeout handler error", K(ret), "context", *this); + } else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) { + TRANS_LOG(WARN, "register timeout handler error", K(ret), "context", *this); } else { // do nothing } @@ -8363,9 +8367,10 @@ int ObPartTransCtx::handle_2pc_request(const ObTrxMsgBase& msg, const int64_t ms } } if (OB_SUCC(ret)) { + const int64_t tmp_config = ObServerConfig::get_instance().trx_2pc_retry_interval; if (OB_FAIL(unregister_timeout_task_())) { TRANS_LOG(WARN, "unregister timeout handler error", K(ret), K(*this)); - } else if (OB_FAIL(register_timeout_task_(ObServerConfig::get_instance().trx_2pc_retry_interval))) { + } else if (OB_FAIL(register_timeout_task_(std::min(tmp_config, (int64_t)MAX_TRANS_2PC_TIMEOUT_US)))) { TRANS_LOG(WARN, "register timeout handler error", K(ret), K(*this)); } else { // do nothing @@ -8766,6 +8771,7 @@ int ObPartTransCtx::on_prepare_(const bool batch_committed, const int64_t timest set_state_(Ob2PCState::PREPARE); is_redo_prepared_ = true; trans_2pc_timeout_ = ObServerConfig::get_instance().trx_2pc_retry_interval; + trans_2pc_timeout_ = std::min(trans_2pc_timeout_, (int64_t)MAX_TRANS_2PC_TIMEOUT_US); if (batch_committed) { // Set up end_log_ts_ for 1pc end_log_ts_ = timestamp; diff --git a/src/storage/transaction/ob_trans_service.cpp b/src/storage/transaction/ob_trans_service.cpp index 01efb8fa7..13e09ab85 100644 --- a/src/storage/transaction/ob_trans_service.cpp +++ b/src/storage/transaction/ob_trans_service.cpp @@ -3451,6 +3451,9 @@ int ObTransService::check_partition_status(const ObPartitionKey& partition) TRANS_LOG(WARN, "get participant status error", K(ret), K(partition)); } else if (OB_SUCCESS != clog_status) { ret = clog_status; + if (REACH_TIME_INTERVAL(ObTransCtx::MAX_TRANS_2PC_TIMEOUT_US)) { + (void)refresh_location_cache(partition, false); + } } else { // do nothing }