From f9b9ce2307359e488b25f616adfbdd37aa0ca7c2 Mon Sep 17 00:00:00 2001 From: Handora Date: Thu, 3 Aug 2023 02:42:36 +0000 Subject: [PATCH] [BUG] ignore 4377 when participants die --- deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h | 3 + src/libtable/test/ob_batch_execute_test.cpp | 12 ++++ src/observer/ob_srv_xlator_primary.cpp | 2 + src/storage/ls/ob_ls_tablet_service.cpp | 47 +++++++++++++ src/storage/ls/ob_ls_tablet_service.h | 1 + src/storage/tx/ob_trans_part_ctx.cpp | 24 +++++++ src/storage/tx/ob_trans_part_ctx.h | 3 + src/storage/tx/ob_trans_rpc.cpp | 60 ++++++++++++++++ src/storage/tx/ob_trans_rpc.h | 15 +++- src/storage/tx/ob_trans_service_v4.cpp | 68 +++++++++++++++++++ src/storage/tx/ob_trans_service_v4.h | 8 +++ src/storage/tx/ob_tx_msg.cpp | 18 +++++ src/storage/tx/ob_tx_msg.h | 29 ++++++++ .../storage/tx/mock_utils/ob_fake_tx_rpc.h | 5 ++ 14 files changed, 293 insertions(+), 2 deletions(-) diff --git a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h index abe61311d..d7a0ce971 100644 --- a/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h +++ b/deps/oblib/src/rpc/obrpc/ob_rpc_packet_list.h @@ -698,6 +698,9 @@ PCODE_DEF(OB_XA_END_STMT_REQ, 0x772) PCODE_DEF(OB_XA_ROLLBACK, 0x773) PCODE_DEF(OB_XA_TERMINATE, 0x774) +// tx state check for 4377 +PCODE_DEF(OB_ASK_TX_STATE_FOR_4377, 0x780) + // clog PCODE_DEF(OB_CLOG, 0x801) diff --git a/src/libtable/test/ob_batch_execute_test.cpp b/src/libtable/test/ob_batch_execute_test.cpp index a7b086e44..aec1d774a 100644 --- a/src/libtable/test/ob_batch_execute_test.cpp +++ b/src/libtable/test/ob_batch_execute_test.cpp @@ -23,6 +23,7 @@ #include "observer/table/ob_htable_utils.h" #include "observer/table/ob_htable_filter_operator.h" #include "observer/table/ob_table_service.h" +#include "storage/ls/ob_ls_tablet_service.h" #include #undef private #undef protected @@ -43,6 +44,17 @@ const char* table_name = "batch_execute_test"; const char* sys_root_pass = ""; typedef char DefaultBuf[128]; +namespace oceanbase +{ +namespace storage +{ +int ObLSTabletService::check_parts_tx_state_in_transfer_for_4377_(transaction::ObTxDesc *) +{ + return OB_SUCCESS; +} +} +} + // create table if not exists batch_execute_test (C1 bigint primary key, C2 bigint, C3 varchar(100)) PARTITION BY KEY(C1) PARTITIONS 16 class TestBatchExecute: public ::testing::Test { diff --git a/src/observer/ob_srv_xlator_primary.cpp b/src/observer/ob_srv_xlator_primary.cpp index 646be3218..7292b47d2 100644 --- a/src/observer/ob_srv_xlator_primary.cpp +++ b/src/observer/ob_srv_xlator_primary.cpp @@ -162,6 +162,8 @@ void oceanbase::observer::init_srv_xlator_for_transaction(ObSrvRpcXlator *xlator RPC_PROCESSOR(ObTxFreeRouteCheckAliveP); RPC_PROCESSOR(ObTxFreeRouteCheckAliveRespP); RPC_PROCESSOR(ObTxFreeRoutePushStateP); + // for tx state check of 4377 + RPC_PROCESSOR(ObAskTxStateFor4377P); } void oceanbase::observer::init_srv_xlator_for_clog(ObSrvRpcXlator *xlator) { diff --git a/src/storage/ls/ob_ls_tablet_service.cpp b/src/storage/ls/ob_ls_tablet_service.cpp index 28c238bee..c5b018642 100755 --- a/src/storage/ls/ob_ls_tablet_service.cpp +++ b/src/storage/ls/ob_ls_tablet_service.cpp @@ -3676,6 +3676,9 @@ int ObLSTabletService::check_old_row_legitimacy( data_tablet_handle))) { ret = tmp_ret; LOG_WARN("check need rollback in transfer for 4377 found exception", K(ret), K(old_row), K(data_table)); + } else if (OB_TMP_FAIL(check_parts_tx_state_in_transfer_for_4377_(run_ctx.store_ctx_.mvcc_acc_ctx_.tx_desc_))) { + ret = tmp_ret; + LOG_WARN("check need rollback in transfer for 4377 found exception", K(ret), K(old_row), K(data_table)); } else if (is_udf) { ret = OB_ERR_INDEX_KEY_NOT_FOUND; LOG_WARN("index key not found on udf column", K(ret), K(old_row)); @@ -6231,6 +6234,7 @@ int ObLSTabletService::ha_get_tablet( } return ret; } + int ObLSTabletService::check_real_leader_for_4377_(const ObLSID ls_id) { int ret = OB_SUCCESS; @@ -6314,5 +6318,48 @@ int ObLSTabletService::check_need_rollback_in_transfer_for_4377_(const transacti return ret; } +int ObLSTabletService::check_parts_tx_state_in_transfer_for_4377_(transaction::ObTxDesc *tx_desc) +{ + int ret = OB_SUCCESS; + transaction::ObTransService *txs = MTL(transaction::ObTransService *); + transaction::ObTxPartList copy_parts; + bool is_alive = false; + + if (OB_ISNULL(tx_desc)) { + ret = OB_ERR_UNEXPECTED; + LOG_ERROR("tx_desc is null", K(ret)); + } else if (OB_FAIL(tx_desc->get_parts_copy(copy_parts))) { + TRANS_LOG(WARN, "get participants copy error", K(ret)); + } else { + // Although we can verify whether transaction data is incomplete due to the + // transfer by checking the local tablet. While in the scenario where we + // validate the index table through the main table, we cannot discover + // whether the main table's txn data is incomplete due to the transfer by + // using the local index table. Furthermore, the local index table and the + // main table may not even locate on the same machine. + // + // Therefore, we choose to traverse all participant lists through RPC to + // confirm whether the txn context that could potentially participate in the + // transfer and has been terminated due to the transfer. In this scenario, + // we consider the occurrence of 4377 as misreporting, and we convert it + // into OB_TRANS_NEED_ROLLBACK to indicate the need to rollback the txn. + for (int64_t i = 0; OB_SUCC(ret) && i < copy_parts.count(); i++) { + ObLSID ls_id = copy_parts[i].id_; + is_alive = false; + if (OB_FAIL(txs->ask_tx_state_for_4377(ls_id, + tx_desc->get_tx_id(), + is_alive))) { + TRANS_LOG(WARN, "fail to ask tx state for 4377", K(tx_desc)); + } else if (!is_alive) { + ret = OB_TRANS_NEED_ROLLBACK; + LOG_WARN("maybe meet transfer during kill tx, which can cause 4377 error, so we will rollback it", + K(tx_desc)); + } + } + } + + return ret; +} + } // namespace storage } // namespace oceanbase diff --git a/src/storage/ls/ob_ls_tablet_service.h b/src/storage/ls/ob_ls_tablet_service.h index be1afe527..68415a6e5 100755 --- a/src/storage/ls/ob_ls_tablet_service.h +++ b/src/storage/ls/ob_ls_tablet_service.h @@ -514,6 +514,7 @@ private: static int check_real_leader_for_4377_(const ObLSID ls_id); static int check_need_rollback_in_transfer_for_4377_(const transaction::ObTxDesc *tx_desc, ObTabletHandle &tablet_handle); + static int check_parts_tx_state_in_transfer_for_4377_(transaction::ObTxDesc *tx_desc); static int build_create_sstable_param_for_migration( const blocksstable::ObMigrationSSTableParam &migrate_sstable_param, ObTabletCreateSSTableParam &create_sstable_param); diff --git a/src/storage/tx/ob_trans_part_ctx.cpp b/src/storage/tx/ob_trans_part_ctx.cpp index e7a1b4bb9..6b32c6878 100755 --- a/src/storage/tx/ob_trans_part_ctx.cpp +++ b/src/storage/tx/ob_trans_part_ctx.cpp @@ -8052,5 +8052,29 @@ int ObPartTransCtx::handle_trans_collect_state_resp(const ObCollectStateRespMsg return ret; } +int ObPartTransCtx::handle_ask_tx_state_for_4377(bool &is_alive) +{ + int ret = OB_SUCCESS; + CtxLockGuard guard(lock_); + + if (IS_NOT_INIT) { + is_alive = false; + TRANS_LOG(WARN, "ObPartTransCtx not inited"); + } else { + if (exec_info_.state_ == ObTxState::ABORT) { + // Lost data read during transfer sources from reading aborted txn. + // Because of the strong log synchronization semantic of transfer, we can + // relay on the transfer src txn is already in a state of death with abort + // log synchronized. + is_alive = false; + } else { + is_alive = true; + } + } + + TRANS_LOG(INFO, "handle ask tx state for 4377", K(ret), KPC(this), K(is_alive)); + return ret; +} + } // namespace transaction } // namespace oceanbase diff --git a/src/storage/tx/ob_trans_part_ctx.h b/src/storage/tx/ob_trans_part_ctx.h index 170b8f761..fa7890889 100644 --- a/src/storage/tx/ob_trans_part_ctx.h +++ b/src/storage/tx/ob_trans_part_ctx.h @@ -217,6 +217,9 @@ public: int handle_trans_ask_state_resp(const ObAskStateRespMsg &msg); int handle_trans_collect_state(ObStateInfo &state_info, const SCN &snapshot); int handle_trans_collect_state_resp(const ObCollectStateRespMsg &msg); + + // tx state check for 4377 + int handle_ask_tx_state_for_4377(bool &is_alive); public: // thread safe int64_t to_string(char* buf, const int64_t buf_len) const; diff --git a/src/storage/tx/ob_trans_rpc.cpp b/src/storage/tx/ob_trans_rpc.cpp index 805574c73..bf0a15caf 100644 --- a/src/storage/tx/ob_trans_rpc.cpp +++ b/src/storage/tx/ob_trans_rpc.cpp @@ -581,6 +581,42 @@ int ObTransRpc::post_sub_response_msg_(const ObAddr &server, ObTxMsg &msg) return ret; } +int ObTransRpc::ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg, + ObAskTxStateFor4377RespMsg &resp) +{ + int ret = OB_SUCCESS; + + uint64_t tenant_id = trans_service_->get_tenant_id(); + int64_t cluster_id = GCONF.cluster_id; + ObAddr server; + + if (OB_UNLIKELY(!is_inited_)) { + TRANS_LOG(WARN, "ObTransRpc not inited"); + ret = OB_NOT_INIT; + } else if (OB_UNLIKELY(!is_running_)) { + TRANS_LOG(WARN, "ObTransRpc is not running"); + ret = OB_NOT_RUNNING; + } else if (OB_UNLIKELY(!is_valid_tenant_id(tenant_id)) + || OB_UNLIKELY(!msg.is_valid())) { + TRANS_LOG(WARN, "invalid argument", K(tenant_id), K(msg)); + ret = OB_INVALID_ARGUMENT; + } else if (OB_FAIL(trans_service_->get_location_adapter()->nonblock_get_leader(cluster_id, + tenant_id, + msg.ls_id_, + server))) { + TRANS_LOG(WARN, "get leader failed", KR(ret), K(msg), K(cluster_id), K(tenant_id)); + } else { + ret = rpc_proxy_. + to(server). + by(tenant_id). + timeout(GCONF._ob_trans_rpc_timeout). + ask_tx_state_for_4377(msg, resp); + TRANS_LOG(WARN, "ask tx state for 4377 finished", KR(ret), K(msg), K(cluster_id)); + } + + return ret; +} + int ObTransRpc::post_standby_msg_(const ObAddr &server, ObTxMsg &msg) { int ret = OB_SUCCESS; @@ -627,6 +663,30 @@ void ObTransRpc::statistics_() } } +int ObAskTxStateFor4377P::process() +{ + int ret = OB_SUCCESS; + bool is_alive = false; + transaction::ObAskTxStateFor4377Msg &msg = arg_; + transaction::ObAskTxStateFor4377RespMsg &resp = result_; + transaction::ObTransService *txs = MTL(transaction::ObTransService*); + + if (OB_ISNULL(txs)) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(WARN, "fail to get trans service", K(ret)); + } else if (OB_FAIL(txs->handle_ask_tx_state_for_4377(msg, is_alive))) { + TRANS_LOG(WARN, "handle ask tx state for 4377 failed", K(ret), K(msg)); + } else { + TRANS_LOG(INFO, "handle ask tx state for 4377 succeed", K(ret), K(msg), K(resp)); + } + + resp.is_alive_ = is_alive; + resp.ret_ = ret; + + // We rewrite the return code to distinguish the rpc error and txn error + return OB_SUCCESS; +} + } // transaction } // oceanbase diff --git a/src/storage/tx/ob_trans_rpc.h b/src/storage/tx/ob_trans_rpc.h index fc1ef57c8..ef6c2bdb3 100644 --- a/src/storage/tx/ob_trans_rpc.h +++ b/src/storage/tx/ob_trans_rpc.h @@ -106,6 +106,8 @@ public: RPC_AP(PR3 post_msg, OB_TX_FREE_ROUTE_CHECK_ALIVE, (transaction::ObTxFreeRouteCheckAliveMsg), ObTransRpcResult); RPC_AP(PR3 post_msg, OB_TX_FREE_ROUTE_CHECK_ALIVE_RESP, (transaction::ObTxFreeRouteCheckAliveRespMsg), ObTransRpcResult); RPC_S(@PR3 sync_access, OB_TX_FREE_ROUTE_PUSH_STATE, (transaction::ObTxFreeRoutePushState), transaction::ObTxFreeRoutePushStateResp); + // state check for 4377 + RPC_S(@PR3 ask_tx_state_for_4377, OB_ASK_TX_STATE_FOR_4377, (transaction::ObAskTxStateFor4377Msg), transaction::ObAskTxStateFor4377RespMsg); }; #define TX_P_(name, pcode) \ @@ -373,6 +375,9 @@ public: virtual int post_msg(const share::ObLSID &p, ObTxMsg &msg) = 0; virtual int post_msg(const ObAddr &server, const ObTxFreeRouteMsg &m) = 0; virtual int sync_access(const ObAddr &server, const ObTxFreeRoutePushState &m, ObTxFreeRoutePushStateResp &result) = 0; + virtual int ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg, + ObAskTxStateFor4377RespMsg &resp) = 0; + }; /* @@ -409,7 +414,9 @@ public: int post_msg(const share::ObLSID &p, ObTxMsg &msg); int post_msg(const ObAddr &server, const ObTxFreeRouteMsg &m); int sync_access(const ObAddr &server, const ObTxFreeRoutePushState &m, ObTxFreeRoutePushStateResp &result); -private: + int ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg, + ObAskTxStateFor4377RespMsg &resp); + private: int post_(const ObAddr &server, ObTxMsg &msg); int post_commit_msg_(const ObAddr &server, ObTxMsg &msg); int post_sub_request_msg_(const ObAddr &server, ObTxMsg &msg); @@ -450,7 +457,11 @@ private: int64_t last_stat_ts_ CACHE_ALIGNED; }; - +class ObAskTxStateFor4377P : public obrpc::ObRpcProcessor< obrpc::ObTransRpcProxy::ObRpc > +{ +protected: + int process(); +}; } // transaction diff --git a/src/storage/tx/ob_trans_service_v4.cpp b/src/storage/tx/ob_trans_service_v4.cpp index 5fce525e9..7f0ce9f10 100755 --- a/src/storage/tx/ob_trans_service_v4.cpp +++ b/src/storage/tx/ob_trans_service_v4.cpp @@ -3534,5 +3534,73 @@ bool ObTransService::is_ls_dropped_(const share::ObLSID ls_id) { } return bret; } + +int ObTransService::ask_tx_state_for_4377(const ObLSID ls_id, + const ObTransID tx_id, + bool &is_alive) +{ + int ret = OB_SUCCESS; + + static const int64_t MAX_ALLOWED_ASK_STATE_FOR_4377_TIMES = 10 * 1000 * 1000; //10s + static const int64_t SLEEP_DURATION_FOR_ASK_STATE_FOR_4377 = 100 * 1000; //100ms + const int64_t start_ts = ObTimeUtility::current_time(); + ObAskTxStateFor4377Msg msg; + ObAskTxStateFor4377RespMsg resp; + msg.tx_id_ = tx_id; + msg.ls_id_ = ls_id; + + do { + if (OB_FAIL(rpc_->ask_tx_state_for_4377(msg, resp))) { + TRANS_LOG(WARN, "ask tx state for 4377 failed", K(ret)); + if (OB_LS_IS_DELETED == ret) { + is_alive = false; + ret = OB_SUCCESS; + TRANS_LOG(WARN, "ls is deleted during ask tx state", K(ret), K(msg)); + } + } else if (OB_FAIL(resp.ret_)) { + ret = resp.ret_; + } else { + is_alive = resp.is_alive_; + } + + if (OB_FAIL(ret)) { + usleep(SLEEP_DURATION_FOR_ASK_STATE_FOR_4377); + } + + if (OB_FAIL(ret) && ObTimeUtility::current_time() - start_ts >= MAX_ALLOWED_ASK_STATE_FOR_4377_TIMES) { + TRANS_LOG(WARN, "timeout for 4377 check", K(ret), K(ls_id), K(tx_id), K(start_ts)); + ret = OB_TIMEOUT; + } + } while (OB_FAIL(ret) && OB_TIMEOUT != ret); + + TRANS_LOG(INFO, "tx state check for 4377 finished", K(ls_id), K(tx_id), K(ret), K(is_alive)); + + return ret; +} + +int ObTransService::handle_ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg, + bool &is_alive) +{ + int ret = OB_SUCCESS; + ObPartTransCtx *ctx = NULL; + is_alive = false; + + if (OB_FAIL(get_tx_ctx_(msg.ls_id_, msg.tx_id_, ctx))) { + TRANS_LOG(WARN, "fail to get tx context", K(ret), K(msg)); + if (OB_TRANS_CTX_NOT_EXIST == ret || OB_PARTITION_NOT_EXIST == ret) { + ret = OB_SUCCESS; + is_alive = false; + TRANS_LOG(WARN, "tx state is not exist for 4377", K(ret), K(msg)); + } + } else if (OB_FAIL(ctx->handle_ask_tx_state_for_4377(is_alive))) { + TRANS_LOG(WARN, "fail to handle trans ask state resp", K(ret), K(msg)); + } + + if (OB_NOT_NULL(ctx)) { + revert_tx_ctx_(ctx); + } + TRANS_LOG(INFO, "handle ask tx state for 4377", K(ret), K(msg), K(is_alive)); + return ret; +} } // transaction } // ocenabase diff --git a/src/storage/tx/ob_trans_service_v4.h b/src/storage/tx/ob_trans_service_v4.h index 6cd8de912..22f333975 100644 --- a/src/storage/tx/ob_trans_service_v4.h +++ b/src/storage/tx/ob_trans_service_v4.h @@ -177,6 +177,14 @@ int check_for_standby(const share::ObLSID &ls_id, void register_standby_cleanup_task(); int do_standby_cleanup(); void handle_defer_abort(ObTxDesc &tx); + +// tx state check for 4377 +int ask_tx_state_for_4377(const ObLSID ls_id, + const ObTransID tx_id, + bool &is_alive); +int handle_ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg, + bool &is_alive); + TO_STRING_KV(K(is_inited_), K(tenant_id_), KP(this)); private: diff --git a/src/storage/tx/ob_tx_msg.cpp b/src/storage/tx/ob_tx_msg.cpp index 54cfba846..e26cd13bd 100644 --- a/src/storage/tx/ob_tx_msg.cpp +++ b/src/storage/tx/ob_tx_msg.cpp @@ -488,5 +488,23 @@ bool ObCollectStateRespMsg::is_valid() const return ret; } +OB_SERIALIZE_MEMBER(ObAskTxStateFor4377Msg, tx_id_, ls_id_); +OB_SERIALIZE_MEMBER(ObAskTxStateFor4377RespMsg, is_alive_, ret_); + +bool ObAskTxStateFor4377Msg::is_valid() const +{ + bool ret = false; + if (tx_id_.is_valid() + && ls_id_.is_valid()) { + ret = true; + } + return ret; +} + +bool ObAskTxStateFor4377RespMsg::is_valid() const +{ + return true; +} + } // transaction } // oceanbase diff --git a/src/storage/tx/ob_tx_msg.h b/src/storage/tx/ob_tx_msg.h index f3a9de77d..0d71298e7 100644 --- a/src/storage/tx/ob_tx_msg.h +++ b/src/storage/tx/ob_tx_msg.h @@ -546,6 +546,35 @@ namespace transaction || (TX_2PC_PREPARE_REDO_REQ <= msg_type && TX_2PC_PREPARE_VERSION_RESP >= msg_type); } }; + + + struct ObAskTxStateFor4377Msg + { + public: + ObAskTxStateFor4377Msg() : + tx_id_(), + ls_id_() {} + public: + ObTransID tx_id_; + share::ObLSID ls_id_; + bool is_valid() const; + TO_STRING_KV(K_(tx_id), K_(ls_id)); + OB_UNIS_VERSION(1); + }; + + struct ObAskTxStateFor4377RespMsg + { + public: + ObAskTxStateFor4377RespMsg() : + is_alive_(false), + ret_(OB_SUCCESS) {} + public: + bool is_alive_; + int ret_; + bool is_valid() const; + TO_STRING_KV(K_(is_alive), K_(ret)); + OB_UNIS_VERSION(1); + }; } } diff --git a/unittest/storage/tx/mock_utils/ob_fake_tx_rpc.h b/unittest/storage/tx/mock_utils/ob_fake_tx_rpc.h index ae5612e48..80559e2fb 100644 --- a/unittest/storage/tx/mock_utils/ob_fake_tx_rpc.h +++ b/unittest/storage/tx/mock_utils/ob_fake_tx_rpc.h @@ -162,6 +162,11 @@ public: OZ(result.deserialize(resp.ptr(), resp.length(), pos)); return ret; } + int ask_tx_state_for_4377(const ObAskTxStateFor4377Msg &msg, + ObAskTxStateFor4377RespMsg &resp) + { + return OB_SUCCESS; + } template int send_msg_callback(const ObAddr &recv, const ObTxMsg &msg,