[fix] the new leader do not degrade crashed server when arbitration_timeout is larger than 10s

This commit is contained in:
BinChenn
2023-10-13 05:40:00 +00:00
committed by ob-robot
parent c7a7d23cfc
commit 9cb41a41d6
9 changed files with 57 additions and 3 deletions

View File

@ -677,6 +677,21 @@ int ret = OB_SUCCESS;
}
}
int ObNetKeepAlive::get_last_resp_ts(const common::ObAddr &addr, int64_t &last_resp_ts)
{
int ret = OB_SUCCESS;
last_resp_ts = OB_INVALID_TIMESTAMP;
easy_addr_t ez_addr = to_ez_addr(addr);
DestKeepAliveState *rs = regist_dest_if_need(ez_addr);
if (rs != NULL) {
last_resp_ts = ATOMIC_LOAD(&rs->last_read_ts_);
} else {
ret = OB_ERR_UNEXPECTED;
}
return ret;
}
}//end of namespace obrpc
}//end of namespace oceanbase

View File

@ -66,6 +66,7 @@ public:
int in_black(const easy_addr_t &addr, bool &in_blacklist, ObNetKeepAliveData *ka_data);
int in_black(const common::ObAddr &addr, bool &in_blacklist, ObNetKeepAliveData *ka_data);
virtual bool in_black(const easy_addr_t &addr);
int get_last_resp_ts(const common::ObAddr &addr, int64_t &last_resp_ts);
private:
void do_server_loop();
void do_client_loop();

View File

@ -64,7 +64,8 @@ bool MockNetKeepAliveAdapter::in_black_or_stopped(const common::ObAddr &server)
bool MockNetKeepAliveAdapter::is_server_stopped(const common::ObAddr &server)
{
return log_deliver_->need_filter_packet_by_blacklist(server);
UNUSED(server);
return false;
}
bool MockNetKeepAliveAdapter::in_black(const common::ObAddr &server)
@ -72,6 +73,17 @@ bool MockNetKeepAliveAdapter::in_black(const common::ObAddr &server)
return log_deliver_->need_filter_packet_by_blacklist(server);
}
int MockNetKeepAliveAdapter::get_last_resp_ts(const common::ObAddr &server,
int64_t &last_resp_ts)
{
if (log_deliver_->need_filter_packet_by_blacklist(server)) {
last_resp_ts = 1;
} else {
last_resp_ts = common::ObTimeUtility::current_time();
}
return OB_SUCCESS;
}
uint32_t get_local_addr(const char *dev_name)
{
int fd, intrface;

View File

@ -78,6 +78,7 @@ public:
bool in_black_or_stopped(const common::ObAddr &server) override final;
bool is_server_stopped(const common::ObAddr &server) override final;
bool in_black(const common::ObAddr &server) override final;
int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) override final;
private:
unittest::ObLogDeliver *log_deliver_;
};

View File

@ -22,12 +22,14 @@ namespace oceanbase
{
using namespace logservice;
int64_t ARB_TIMEOUT_ARG = 2 * 1000 * 1000L;
namespace logservice
{
void ObArbitrationService::update_arb_timeout_()
{
arb_timeout_us_ = 2 * 1000 * 1000L;
arb_timeout_us_ = ARB_TIMEOUT_ARG;
if (REACH_TIME_INTERVAL(2 * 1000 * 1000)) {
CLOG_LOG_RET(WARN, OB_ERR_UNEXPECTED, "update_arb_timeout_", K_(self), K_(arb_timeout_us));
}
@ -213,6 +215,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade)
palf_list[another_f_idx]->palf_handle_impl_->set_location_cache_cb(&loc_cb);
// block net of old leader, new leader will be elected
// and degrade in RECONFIRM state
ARB_TIMEOUT_ARG = 15 * 1000 * 1000;
block_net(leader_idx, another_f_idx);
block_net(leader_idx, arb_replica_idx);
// block_net后会理解进行降级操作,导致旧主上有些单副本写成功的日志被committed
@ -234,6 +237,7 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_reconfirm_degrade_upgrade)
leader.reset();
new_leader.reset();
delete_paxos_group(id);
ARB_TIMEOUT_ARG = 2 * 1000 * 1000;
PALF_LOG(INFO, "end test_2f1a_reconfirm_degrade_upgrade", K(id));
}

View File

@ -738,7 +738,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_disk_full_reconfirm)
srv->set_leader(id, addr1);
}
EXPECT_UNTIL_EQ(false, a_handle->palf_handle_impl_->state_mgr_.is_leader_active());
global_timeous_us = 5 * 1000 * 1000;
global_timeous_us = 15 * 1000 * 1000;
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start();
for (auto srv: get_cluster()) {

View File

@ -84,5 +84,22 @@ bool ObNetKeepAliveAdapter::in_black(const common::ObAddr &server)
}
return bool_ret;
}
int ObNetKeepAliveAdapter::get_last_resp_ts(const common::ObAddr &server,
int64_t &last_resp_ts)
{
int ret = OB_SUCCESS;
if (!server.is_valid()) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(server));
} else if (OB_FAIL(net_keepalive_->get_last_resp_ts(server, last_resp_ts))) {
CLOG_LOG(WARN, "get_last_resp_ts failed", K(ret), K(server));
} else {
if (REACH_TIME_INTERVAL(1 * 1000 * 1000)) {
CLOG_LOG(TRACE, "get_last_resp_ts", K(server), K(last_resp_ts));
}
}
return ret;
}
} // end namespace logservice
} // end namespace oceanbase

View File

@ -10,6 +10,7 @@
#ifndef OCEANBASE_LOGSERVICE_OB_NET_KEEPALIVE_ADPATER_H_
#define OCEANBASE_LOGSERVICE_OB_NET_KEEPALIVE_ADPATER_H_
#include <stdint.h> // for int64_t etc.
namespace oceanbase
{
@ -30,6 +31,7 @@ public:
virtual bool in_black_or_stopped(const common::ObAddr &server) = 0;
virtual bool is_server_stopped(const common::ObAddr &server) = 0;
virtual bool in_black(const common::ObAddr &server) = 0;
virtual int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) = 0;
};
class ObNetKeepAliveAdapter : public IObNetKeepAliveAdapter {
@ -39,6 +41,7 @@ public:
bool in_black_or_stopped(const common::ObAddr &server) override final;
bool is_server_stopped(const common::ObAddr &server) override final;
bool in_black(const common::ObAddr &server) override final;
int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) override final;
private:
int in_black_or_stopped_(const common::ObAddr &server,
bool &in_black,

View File

@ -29,6 +29,7 @@ public:
bool in_black_or_stopped(const common::ObAddr &server) override final {return false;}
bool is_server_stopped(const common::ObAddr &server) override final {return false;}
bool in_black(const common::ObAddr &server) override final {return false;}
int get_last_resp_ts(const common::ObAddr &server, int64_t &last_resp_ts) { return OB_SUCCESS; }
};
const ObAddr addr1(ObAddr::IPV4, "127.0.0.1", 1000);