diff --git a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp index fce69d58f7..7645ca9ed6 100644 --- a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp +++ b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.cpp @@ -677,6 +677,21 @@ int ret = OB_SUCCESS; } } +int ObNetKeepAlive::get_last_resp_ts(const common::ObAddr &addr, int64_t &last_resp_ts) +{ + int ret = OB_SUCCESS; + last_resp_ts = OB_INVALID_TIMESTAMP; + + easy_addr_t ez_addr = to_ez_addr(addr); + DestKeepAliveState *rs = regist_dest_if_need(ez_addr); + if (rs != NULL) { + last_resp_ts = ATOMIC_LOAD(&rs->last_read_ts_); + } else { + ret = OB_ERR_UNEXPECTED; + } + return ret; +} + }//end of namespace obrpc }//end of namespace oceanbase diff --git a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.h b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.h index 466b00aad0..f8f5d9a801 100644 --- a/deps/oblib/src/rpc/obrpc/ob_net_keepalive.h +++ b/deps/oblib/src/rpc/obrpc/ob_net_keepalive.h @@ -66,6 +66,7 @@ public: int in_black(const easy_addr_t &addr, bool &in_blacklist, ObNetKeepAliveData *ka_data); int in_black(const common::ObAddr &addr, bool &in_blacklist, ObNetKeepAliveData *ka_data); virtual bool in_black(const easy_addr_t &addr); + int get_last_resp_ts(const common::ObAddr &addr, int64_t &last_resp_ts); private: void do_server_loop(); void do_client_loop(); diff --git a/mittest/logservice/env/ob_simple_log_server.cpp b/mittest/logservice/env/ob_simple_log_server.cpp index f7df7b9e64..e31e7c6b88 100644 --- a/mittest/logservice/env/ob_simple_log_server.cpp +++ b/mittest/logservice/env/ob_simple_log_server.cpp @@ -64,7 +64,8 @@ bool MockNetKeepAliveAdapter::in_black_or_stopped(const common::ObAddr &server) bool MockNetKeepAliveAdapter::is_server_stopped(const common::ObAddr &server) { - return log_deliver_->need_filter_packet_by_blacklist(server); + UNUSED(server); + return false; } bool MockNetKeepAliveAdapter::in_black(const common::ObAddr &server) @@ -72,6 +73,17 @@ bool MockNetKeepAliveAdapter::in_black(const common::ObAddr &server) return log_deliver_->need_filter_packet_by_blacklist(server); } +int MockNetKeepAliveAdapter::get_last_resp_ts(const common::ObAddr &server, + int64_t &last_resp_ts) +{ + if (log_deliver_->need_filter_packet_by_blacklist(server)) { + last_resp_ts = 1; + } else { + last_resp_ts = common::ObTimeUtility::current_time(); + } + return OB_SUCCESS; +} + uint32_t get_local_addr(const char *dev_name) { int fd, intrface; diff --git a/mittest/logservice/env/ob_simple_log_server.h b/mittest/logservice/env/ob_simple_log_server.h index ce3b1d2620..b34018c1c3 100644 --- a/mittest/logservice/env/ob_simple_log_server.h +++ b/mittest/logservice/env/ob_simple_log_server.h @@ -78,6 +78,7 @@ public: bool in_black_or_stopped(const common::ObAddr &server) override final; bool is_server_stopped(const common::ObAddr &server) override final; bool in_black(const common::ObAddr &server) override final; + int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) override final; private: unittest::ObLogDeliver *log_deliver_; }; diff --git a/mittest/logservice/test_ob_simple_log_arb.cpp b/mittest/logservice/test_ob_simple_log_arb.cpp index 88eedc7d39..be58ace146 100755 --- a/mittest/logservice/test_ob_simple_log_arb.cpp +++ b/mittest/logservice/test_ob_simple_log_arb.cpp @@ -22,12 +22,14 @@ namespace oceanbase { using namespace logservice; +int64_t ARB_TIMEOUT_ARG = 2 * 1000 * 1000L; + namespace logservice { void ObArbitrationService::update_arb_timeout_() { - arb_timeout_us_ = 2 * 1000 * 1000L; + arb_timeout_us_ = ARB_TIMEOUT_ARG; if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) { CLOG_LOG_RET(WARN, OB_ERR_UNEXPECTED, "update_arb_timeout_", K_(self), K_(arb_timeout_us)); } @@ -213,6 +215,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade) palf_list[another_f_idx]->palf_handle_impl_->set_location_cache_cb(&loc_cb); // block net of old leader, new leader will be elected // and degrade in RECONFIRM state + ARB_TIMEOUT_ARG = 15 * 1000 * 1000; block_net(leader_idx, another_f_idx); block_net(leader_idx, arb_replica_idx); // block_net后会理解进行降级操作,导致旧主上有些单副本写成功的日志被committed @@ -234,6 +237,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade) leader.reset(); new_leader.reset(); delete_paxos_group(id); + ARB_TIMEOUT_ARG = 2 * 1000 * 1000; PALF_LOG(INFO, "end test_2f1a_reconfirm_degrade_upgrade", K(id)); } diff --git a/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp b/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp index fd34b61484..a4a67272ff 100755 --- a/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp +++ b/mittest/logservice/test_ob_simple_log_arb_mock_ele.cpp @@ -738,7 +738,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_disk_full_reconfirm) srv->set_leader(id, addr1); } EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active()); - global_timeous_us = 5 * 1000 * 1000; + global_timeous_us = 15 * 1000 * 1000; dynamic_cast(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start(); for (auto srv: get_cluster()) { diff --git a/src/logservice/ob_net_keepalive_adapter.cpp b/src/logservice/ob_net_keepalive_adapter.cpp index c682017601..8478c5542f 100644 --- a/src/logservice/ob_net_keepalive_adapter.cpp +++ b/src/logservice/ob_net_keepalive_adapter.cpp @@ -84,5 +84,22 @@ bool ObNetKeepAliveAdapter::in_black(const common::ObAddr &server) } return bool_ret; } + +int ObNetKeepAliveAdapter::get_last_resp_ts(const common::ObAddr &server, + int64_t &last_resp_ts) +{ + int ret = OB_SUCCESS; + if (!server.is_valid()) { + ret = OB_INVALID_ARGUMENT; + CLOG_LOG(WARN, "invalid argument", K(server)); + } else if (OB_FAIL(net_keepalive_->get_last_resp_ts(server, last_resp_ts))) { + CLOG_LOG(WARN, "get_last_resp_ts failed", K(ret), K(server)); + } else { + if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) { + CLOG_LOG(TRACE, "get_last_resp_ts", K(server), K(last_resp_ts)); + } + } + return ret; +} } // end namespace logservice } // end namespace oceanbase diff --git a/src/logservice/ob_net_keepalive_adapter.h b/src/logservice/ob_net_keepalive_adapter.h index d141776c57..5376dafb8e 100644 --- a/src/logservice/ob_net_keepalive_adapter.h +++ b/src/logservice/ob_net_keepalive_adapter.h @@ -10,6 +10,7 @@ #ifndef OCEANBASE_LOGSERVICE_OB_NET_KEEPALIVE_ADPATER_H_ #define OCEANBASE_LOGSERVICE_OB_NET_KEEPALIVE_ADPATER_H_ +#include // for int64_t etc. namespace oceanbase { @@ -30,6 +31,7 @@ public: virtual bool in_black_or_stopped(const common::ObAddr &server) = 0; virtual bool is_server_stopped(const common::ObAddr &server) = 0; virtual bool in_black(const common::ObAddr &server) = 0; + virtual int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) = 0; }; class ObNetKeepAliveAdapter : public IObNetKeepAliveAdapter { @@ -39,6 +41,7 @@ public: bool in_black_or_stopped(const common::ObAddr &server) override final; bool is_server_stopped(const common::ObAddr &server) override final; bool in_black(const common::ObAddr &server) override final; + int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) override final; private: int in_black_or_stopped_(const common::ObAddr &server, bool &in_black, diff --git a/unittest/logservice/test_ob_arbitration_service.cpp b/unittest/logservice/test_ob_arbitration_service.cpp index 8db309eb60..eb85cbe779 100644 --- a/unittest/logservice/test_ob_arbitration_service.cpp +++ b/unittest/logservice/test_ob_arbitration_service.cpp @@ -29,6 +29,7 @@ public: bool in_black_or_stopped(const common::ObAddr &server) override final {return false;} bool is_server_stopped(const common::ObAddr &server) override final {return false;} bool in_black(const common::ObAddr &server) override final {return false;} + int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) { return OB_SUCCESS; } }; const ObAddr addr1(ObAddr::IPV4, "127.0.0.1", 1000);