[fix] fix leader's committed_end_lsn can not be updated
This commit is contained in:
parent
18209e0665
commit
6d94f30456
@ -35,7 +35,8 @@ ob_unittest_clog(test_ob_simple_log_throttling test_ob_simple_log_throttling.cp
|
||||
ob_unittest_clog(test_ob_simple_log_throttling_member_change test_ob_simple_log_throttling_member_change.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_throttling_arb test_ob_simple_log_throttling_arb.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_config_change_mock_ele test_ob_simple_log_config_change_mock_ele.cpp)
|
||||
#ob_unittest_clog(test_ob_simple_log_arb_mock_ele test_ob_simple_log_arb_mock_ele.cpp) TODO(yunlong.cb): fix it
|
||||
ob_unittest_clog(test_ob_simple_log_arb_mock_ele test_ob_simple_log_arb_mock_ele.cpp)
|
||||
ob_unittest_clog(test_ob_simple_log_flashback_arb test_ob_simple_log_flashback_arb.cpp)
|
||||
|
||||
|
||||
add_subdirectory(archiveservice)
|
||||
|
@ -1364,5 +1364,34 @@ void ObSimpleLogClusterTestEnv::set_disk_options_for_throttling(PalfEnvImpl &pal
|
||||
palf_env_impl.disk_options_wrapper_.set_cur_unrecyclable_log_disk_size(unrecyclable_size);
|
||||
}
|
||||
|
||||
bool ObSimpleLogClusterTestEnv::is_degraded(const PalfHandleImplGuard &leader,
|
||||
const int64_t degraded_server_idx)
|
||||
{
|
||||
bool has_degraded = false;
|
||||
while (!has_degraded) {
|
||||
common::GlobalLearnerList degraded_learner_list;
|
||||
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
|
||||
has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr());
|
||||
sleep(1);
|
||||
PALF_LOG(INFO, "wait degrade");
|
||||
}
|
||||
return has_degraded;
|
||||
}
|
||||
|
||||
bool ObSimpleLogClusterTestEnv::is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id)
|
||||
{
|
||||
bool has_upgraded = false;
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
|
||||
while (!has_upgraded) {
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
|
||||
common::GlobalLearnerList degraded_learner_list;
|
||||
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
|
||||
has_upgraded = (0 == degraded_learner_list.get_member_number());
|
||||
sleep(1);
|
||||
PALF_LOG(INFO, "wait upgrade");
|
||||
}
|
||||
return has_upgraded;
|
||||
}
|
||||
|
||||
} // end namespace unittest
|
||||
} // end namespace oceanbase
|
||||
|
@ -243,6 +243,8 @@ public:
|
||||
void switch_append_to_flashback(PalfHandleImplGuard &leader, int64_t &mode_version);
|
||||
void switch_flashback_to_append(PalfHandleImplGuard &leader, int64_t &mode_version);
|
||||
void set_disk_options_for_throttling(PalfEnvImpl &palf_env_impl);
|
||||
bool is_degraded(const PalfHandleImplGuard &leader, const int64_t degraded_server_idx);
|
||||
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id);
|
||||
public:
|
||||
static int64_t palf_id_;
|
||||
private:
|
||||
|
@ -716,6 +716,8 @@ TEST_F(TestObSimpleLogClusterArbService, test_2f1a_degrade_when_no_leader)
|
||||
|
||||
EXPECT_EQ(leader.palf_handle_impl_->self_, new_leader.palf_handle_impl_->self_);
|
||||
|
||||
// waiting for upgrading
|
||||
is_upgraded(leader, id);
|
||||
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->config_mgr_.get_log_sync_member_list( \
|
||||
leader_member_list, leader_replica_num));
|
||||
EXPECT_EQ(2, leader_member_list.get_member_number());
|
||||
|
@ -42,34 +42,6 @@ class TestObSimpleLogClusterArbMockEleService : public ObSimpleLogClusterTestEnv
|
||||
public:
|
||||
TestObSimpleLogClusterArbMockEleService() : ObSimpleLogClusterTestEnv()
|
||||
{}
|
||||
bool is_degraded(const PalfHandleImplGuard &leader,
|
||||
const int64_t degraded_server_idx)
|
||||
{
|
||||
bool has_degraded = false;
|
||||
while (!has_degraded) {
|
||||
common::GlobalLearnerList degraded_learner_list;
|
||||
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
|
||||
has_degraded = degraded_learner_list.contains(get_cluster()[degraded_server_idx]->get_addr());
|
||||
sleep(1);
|
||||
PALF_LOG(INFO, "wait degrade");
|
||||
}
|
||||
return has_degraded;
|
||||
}
|
||||
|
||||
bool is_upgraded(PalfHandleImplGuard &leader, const int64_t palf_id)
|
||||
{
|
||||
bool has_upgraded = false;
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
|
||||
while (!has_upgraded) {
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, palf_id));
|
||||
common::GlobalLearnerList degraded_learner_list;
|
||||
leader.palf_handle_impl_->config_mgr_.get_degraded_learner_list(degraded_learner_list);
|
||||
has_upgraded = (0 == degraded_learner_list.get_member_number());
|
||||
sleep(1);
|
||||
PALF_LOG(INFO, "wait upgrade");
|
||||
}
|
||||
return has_upgraded;
|
||||
}
|
||||
};
|
||||
|
||||
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
|
||||
@ -153,7 +125,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_during_degrading)
|
||||
|
||||
TimeoutChecker not_timeout(TIMEOUT_US);
|
||||
bool is_already_finished = false;
|
||||
LogConfigInfo new_config_info;
|
||||
LogConfigInfoV2 new_config_info;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->check_args_and_generate_config_(upgrade_b_args, upgrade_b_pid, upgrade_b_ele_epoch,
|
||||
is_already_finished, new_config_info));
|
||||
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->wait_log_barrier_(upgrade_b_args, new_config_info, not_timeout));
|
||||
@ -269,7 +241,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, switch_leader_to_other_during_de
|
||||
|
||||
TimeoutChecker not_timeout(TIMEOUT_US);
|
||||
bool is_already_finished = false;
|
||||
LogConfigInfo new_config_info;
|
||||
LogConfigInfoV2 new_config_info;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->check_args_and_generate_config_(upgrade_a_args, upgrade_a_pid, upgrade_a_ele_epoch,
|
||||
is_already_finished, new_config_info));
|
||||
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->wait_log_barrier_(upgrade_a_args, new_config_info, not_timeout));
|
||||
@ -419,7 +391,9 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_change_config_fail)
|
||||
PalfHandleImplGuard *b_handle = palf_list[b_idx];
|
||||
PalfHandleImplGuard *d_handle = palf_list[d_idx];
|
||||
|
||||
LogConfigChangeArgs add_d_arg(common::ObMember(d_addr, 1), 4, ADD_MEMBER);
|
||||
LogConfigVersion config_version;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
|
||||
LogConfigChangeArgs add_d_arg(common::ObMember(d_addr, 1), 4, config_version, ADD_MEMBER);
|
||||
int64_t add_d_pid = 0;
|
||||
int64_t add_d_epoch = 0;
|
||||
LogConfigVersion add_d_version;
|
||||
@ -430,7 +404,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_2f1a_change_config_fail)
|
||||
TimeoutChecker not_timeout(TIMEOUT_US);
|
||||
bool is_already_finished = false;
|
||||
bool has_new_version = true;
|
||||
LogConfigInfo new_config_info;
|
||||
LogConfigInfoV2 new_config_info;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->check_args_and_generate_config_(add_d_arg, add_d_pid, add_d_epoch,
|
||||
is_already_finished, new_config_info));
|
||||
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->config_mgr_.check_follower_sync_status(add_d_arg, new_config_info, has_new_version));
|
||||
@ -517,8 +491,11 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_arb_degrade_probe)
|
||||
PalfHandleImplGuard leader;
|
||||
std::vector<PalfHandleImplGuard*> palf_list;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT));
|
||||
LogConfigVersion config_version;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
|
||||
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
|
||||
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->start();
|
||||
@ -585,8 +562,8 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_add_remove_lose_logs)
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L;
|
||||
OB_LOGGER.set_log_level("TRACE");
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "test_arb_degrade_probe");
|
||||
PALF_LOG(INFO, "begin test test_arb_degrade_probe", K(id));
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "test_add_remove_lose_logs");
|
||||
PALF_LOG(INFO, "begin test test_add_remove_lose_logs", K(id));
|
||||
{
|
||||
int64_t leader_idx = 0;
|
||||
int64_t arb_replica_idx = 0;
|
||||
@ -594,8 +571,11 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_add_remove_lose_logs)
|
||||
std::vector<PalfHandleImplGuard*> palf_list;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, CONFIG_CHANGE_TIMEOUT));
|
||||
LogConfigVersion config_version;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[3]->get_addr(), 1), 3, config_version, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(ObMember(get_cluster()[4]->get_addr(), 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
|
||||
dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->stop();
|
||||
|
||||
@ -629,7 +609,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_add_remove_lose_logs)
|
||||
|
||||
TimeoutChecker not_timeout(CONFIG_CHANGE_TIMEOUT);
|
||||
bool is_already_finished = false;
|
||||
LogConfigInfo new_config_info;
|
||||
LogConfigInfoV2 new_config_info;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->check_args_and_generate_config_(remove_e_args, remove_e_pid, remove_e_ele_epoch,
|
||||
is_already_finished, new_config_info));
|
||||
EXPECT_UNTIL_EQ(OB_SUCCESS, leader.palf_handle_impl_->wait_log_barrier_(remove_e_args, new_config_info, not_timeout));
|
||||
@ -656,7 +636,7 @@ TEST_F(TestObSimpleLogClusterArbMockEleService, test_add_remove_lose_logs)
|
||||
revert_cluster_palf_handle_guard(palf_list);
|
||||
}
|
||||
delete_paxos_group(id);
|
||||
PALF_LOG(INFO, "end test test_arb_degrade_probe", K(id));
|
||||
PALF_LOG(INFO, "end test test_add_remove_lose_logs", K(id));
|
||||
}
|
||||
|
||||
} // end unittest
|
||||
|
@ -257,6 +257,63 @@ TEST_F(TestObSimpleLogClusterConfigChangeMockEle, switch_leader_during_removing_
|
||||
PALF_LOG(INFO, "end test switch_leader_during_removing_member3", K(id));
|
||||
}
|
||||
|
||||
// 1. remove D from member list (ABCD) and match_lsn_map, committed_end_lsn is 100 and last_submit_end_lsn is 200.
|
||||
// 2. because committed_end_lsn is smaller than last_submit_end_lsn, the leader will use prev_member_list (ABCD) to generate committed_end_lsn
|
||||
// 3. D is not in match_lsn_map, the leader may can not generate committed_end_lsn
|
||||
TEST_F(TestObSimpleLogClusterConfigChangeMockEle, test_committed_end_lsn_after_removing_member)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L; // 10s
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "test_committed_end_lsn_after_removing_member");
|
||||
PALF_LOG(INFO, "begin test test_committed_end_lsn_after_removing_member", K(id));
|
||||
{
|
||||
int64_t leader_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
std::vector<PalfHandleImplGuard*> palf_list;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_mock_election(id, leader_idx, leader));
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
|
||||
EXPECT_EQ(OB_SUCCESS, get_cluster_palf_handle_guard(id, palf_list));
|
||||
|
||||
const int64_t b_idx = (leader_idx + 1) % 4;
|
||||
const int64_t c_idx = (leader_idx + 2) % 4;
|
||||
const int64_t d_idx = (leader_idx + 3) % 4;
|
||||
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
|
||||
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
|
||||
const common::ObAddr d_addr = get_cluster()[d_idx]->get_addr();
|
||||
PalfHandleImplGuard *a_handle = palf_list[leader_idx];
|
||||
PalfHandleImplGuard *b_handle = palf_list[b_idx];
|
||||
PalfHandleImplGuard *c_handle = palf_list[c_idx];
|
||||
PalfHandleImplGuard *d_handle = palf_list[d_idx];
|
||||
LogConfigVersion config_version;
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->get_config_version(config_version));
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->add_member(common::ObMember(d_addr, 1), 4, config_version, CONFIG_CHANGE_TIMEOUT));
|
||||
|
||||
// 1. leader can not commit logs
|
||||
block_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, id));
|
||||
sleep(1);
|
||||
EXPECT_GT(leader.palf_handle_impl_->sw_.last_submit_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_);
|
||||
|
||||
// 2. remove D
|
||||
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->remove_member(common::ObMember(d_addr, 1), 3, CONFIG_CHANGE_TIMEOUT));
|
||||
EXPECT_GT(leader.palf_handle_impl_->sw_.last_submit_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_);
|
||||
EXPECT_EQ(leader.palf_handle_impl_->config_mgr_.reconfig_barrier_.prev_end_lsn_, leader.palf_handle_impl_->sw_.last_submit_end_lsn_);
|
||||
EXPECT_GT(leader.palf_handle_impl_->config_mgr_.reconfig_barrier_.prev_end_lsn_, leader.palf_handle_impl_->sw_.committed_end_lsn_);
|
||||
|
||||
// 3. leader can commit logs
|
||||
unblock_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
|
||||
|
||||
// 4. check if the leader can commit logs after D has been removed from match_lsn_map
|
||||
EXPECT_UNTIL_EQ(leader.palf_handle_impl_->sw_.committed_end_lsn_, leader.palf_handle_impl_->sw_.last_submit_end_lsn_);
|
||||
|
||||
leader.reset();
|
||||
revert_cluster_palf_handle_guard(palf_list);
|
||||
}
|
||||
delete_paxos_group(id);
|
||||
PALF_LOG(INFO, "end test test_committed_end_lsn_after_removing_member", K(id));
|
||||
}
|
||||
|
||||
} // end unittest
|
||||
} // end oceanbase
|
||||
|
||||
|
208
mittest/logservice/test_ob_simple_log_flashback_arb.cpp
Normal file
208
mittest/logservice/test_ob_simple_log_flashback_arb.cpp
Normal file
@ -0,0 +1,208 @@
|
||||
// Copyright (c) 2021 OceanBase
|
||||
// OceanBase is licensed under Mulan PubL v2.
|
||||
// You can use this software according to the terms and conditions of the Mulan PubL v2.
|
||||
// You may obtain a copy of Mulan PubL v2 at:
|
||||
// http://license.coscl.org.cn/MulanPubL-2.0
|
||||
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
||||
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
||||
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
||||
// See the Mulan PubL v2 for more details.
|
||||
#include "lib/file/file_directory_utils.h"
|
||||
#include "lib/utility/ob_macro_utils.h"
|
||||
#include "logservice/palf/log_define.h"
|
||||
#include <cstdio>
|
||||
#include <gtest/gtest.h>
|
||||
#include <signal.h>
|
||||
#include "lib/utility/ob_defer.h"
|
||||
#define private public
|
||||
#include "env/ob_simple_log_cluster_env.h"
|
||||
|
||||
#undef private
|
||||
|
||||
const std::string TEST_NAME = "flashback_arb";
|
||||
using namespace oceanbase::common;
|
||||
using namespace oceanbase;
|
||||
namespace oceanbase
|
||||
{
|
||||
using namespace logservice;
|
||||
|
||||
namespace logservice
|
||||
{
|
||||
int LogRequestHandler::change_access_mode_(const LogChangeAccessModeCmd &req)
|
||||
{
|
||||
int ret = common::OB_SUCCESS;
|
||||
if (false == req.is_valid()) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
CLOG_LOG(ERROR, "Invalid argument!!!", K(ret), K(req));
|
||||
} else {
|
||||
palf::PalfHandleGuard palf_handle_guard;
|
||||
const int64_t palf_id = req.ls_id_;
|
||||
const common::ObAddr &server = req.src_;
|
||||
int64_t proposal_id = palf::INVALID_PROPOSAL_ID;
|
||||
common::ObRole role = ObRole::FOLLOWER;
|
||||
if (OB_FAIL(get_palf_handle_guard_(palf_id, palf_handle_guard))) {
|
||||
CLOG_LOG(WARN, "get_palf_handle_guard_ failed", K(ret), K(palf_id));
|
||||
} else if (OB_FAIL(palf_handle_guard.get_role(role, proposal_id))) {
|
||||
} else if (OB_FAIL(palf_handle_guard.change_access_mode(proposal_id, req.mode_version_,
|
||||
req.access_mode_, req.ref_scn_))) {
|
||||
CLOG_LOG(WARN, "change_access_mode failed", K(ret), K(palf_id), K(server));
|
||||
} else {
|
||||
CLOG_LOG(INFO, "change_access_mode success", K(ret), K(req));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogFlashbackService::get_ls_list_(const uint64_t tenant_id,
|
||||
share::ObLSStatusInfoArray &ls_array)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
common::ObFunction<int(const palf::PalfHandle&)> get_palf_info =
|
||||
[&](const palf::PalfHandle &palf_handle)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
share::ObLSStatusInfo ls_status;
|
||||
int64_t palf_id = -1;
|
||||
palf_handle.get_palf_id(palf_id);
|
||||
share::ObLSFlag flag(share::ObLSFlag::NORMAL_FLAG);
|
||||
if (OB_FAIL(ls_status.init(tenant_id, share::ObLSID(palf_id), 1, share::ObLSStatus::OB_LS_NORMAL, 1, "z1", flag))) {
|
||||
CLOG_LOG(WARN, "ls_status init failed", K(ret), K(palf_id));
|
||||
} else if (OB_FAIL(ls_array.push_back(ls_status))) {
|
||||
CLOG_LOG(WARN, "ls_array push_back failed", K(ret), K(palf_id));
|
||||
}
|
||||
return ret;
|
||||
};
|
||||
logservice::ObLogService *log_service = NULL;
|
||||
log_service = MTL(logservice::ObLogService*);
|
||||
if (false == get_palf_info.is_valid()) {
|
||||
CLOG_LOG(ERROR, "invalid ObFunction", K(ret));
|
||||
} else if (OB_FAIL(log_service->iterate_palf(get_palf_info))) {
|
||||
CLOG_LOG(ERROR, "iterate_palf failed", K(ret));
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObLogFlashbackService::BaseLSOperator::update_leader_()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
leader_.reset();
|
||||
logservice::ObLogService *log_service = NULL;
|
||||
log_service = MTL(logservice::ObLogService*);
|
||||
palf::PalfHandleGuard palf_handle;
|
||||
if (OB_FAIL(log_service->open_palf(ls_id_, palf_handle))) {
|
||||
CLOG_LOG(ERROR, "open_palf failed", K(ret), K(ls_id));
|
||||
} else {
|
||||
palf::PalfHandleImpl *palf_handle_impl = dynamic_cast<palf::PalfHandleImpl*>(palf_handle.palf_handle_.palf_handle_impl_);
|
||||
leader_ = palf_handle_impl->state_mgr_.get_leader();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int LogRequestHandler::get_self_addr_(common::ObAddr &self) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
logservice::ObLogService *log_service = NULL;
|
||||
log_service = MTL(logservice::ObLogService*);
|
||||
self = log_service->self_;
|
||||
return ret;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace unittest
|
||||
{
|
||||
class TestObSimpleLogClusterFlashbackArb : public ObSimpleLogClusterTestEnv
|
||||
{
|
||||
public:
|
||||
TestObSimpleLogClusterFlashbackArb() {}
|
||||
};
|
||||
|
||||
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 3;
|
||||
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 3;
|
||||
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
|
||||
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = true;
|
||||
|
||||
// 2F1A
|
||||
// 1. A reconfiguration (upgrade B) has been executed successfully with log_barrier 100
|
||||
// 2. the palf group is flashed back to 50, but reconfig_barrier in LogConfigMgr is still 100
|
||||
// 3. change to APPEND mode
|
||||
// 4. block_pcode PUSH_LOG_RESP, so the leader can not commit logs by itself
|
||||
// 5. append logs
|
||||
// 6. logs in (50, 100) must not be committed by prev_member_list(A)
|
||||
TEST_F(TestObSimpleLogClusterFlashbackArb, test_flashback_after_upgrading)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
|
||||
const int64_t CONFIG_CHANGE_TIMEOUT = 10 * 1000 * 1000L;
|
||||
OB_LOGGER.set_log_level("TRACE");
|
||||
SET_CASE_LOG_FILE(TEST_NAME, "test_flashback_after_upgrading");
|
||||
PALF_LOG(INFO, "begin test test_flashback_after_upgrading", K(id));
|
||||
{
|
||||
int64_t leader_idx = 0;
|
||||
int64_t arb_replica_idx = 0;
|
||||
PalfHandleImplGuard leader;
|
||||
EXPECT_EQ(OB_SUCCESS, create_paxos_group_with_arb_mock_election(id, arb_replica_idx, leader_idx, leader));
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
|
||||
|
||||
// dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx])->log_service_.get_arbitration_service()->stop();
|
||||
const int64_t b_idx = (leader_idx + 1) % 3;
|
||||
const int64_t c_idx = (leader_idx + 2) % 3;
|
||||
const common::ObAddr a_addr = get_cluster()[leader_idx]->get_addr();
|
||||
const common::ObAddr b_addr = get_cluster()[b_idx]->get_addr();
|
||||
const common::ObAddr c_addr = get_cluster()[c_idx]->get_addr();
|
||||
const SCN flashback_scn = leader.palf_handle_impl_->sw_.get_max_scn();
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 200, id));
|
||||
const LSN before_flashback_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
|
||||
// 1. block_net and degrade
|
||||
block_net(leader_idx, b_idx);
|
||||
is_degraded(leader, b_idx);
|
||||
|
||||
// 2. unblock_net and upgrade
|
||||
unblock_net(leader_idx, b_idx);
|
||||
is_upgraded(leader, id);
|
||||
|
||||
// 3. flashback
|
||||
const LSN barrier_end_lsn = leader.palf_handle_impl_->config_mgr_.reconfig_barrier_.prev_end_lsn_;
|
||||
const int64_t barrier_mode_pid = leader.palf_handle_impl_->config_mgr_.reconfig_barrier_.prev_mode_pid_;
|
||||
int64_t mode_version = 0;
|
||||
switch_append_to_raw_write(leader, mode_version);
|
||||
ObLogFlashbackService *flashback_srv = get_cluster()[0]->get_flashback_service();
|
||||
ObTenantEnv::set_tenant(get_cluster()[leader_idx]->get_tenant_base());
|
||||
EXPECT_EQ(OB_SUCCESS, flashback_srv->flashback(MTL_ID(), flashback_scn, CONFIG_CHANGE_TIMEOUT));
|
||||
switch_flashback_to_append(leader, mode_version);
|
||||
EXPECT_UNTIL_EQ(leader.palf_handle_impl_->sw_.get_max_lsn(), leader.palf_handle_impl_->sw_.committed_end_lsn_);
|
||||
const LSN after_flashback_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
const LSN after_flashback_end_lsn = leader.palf_handle_impl_->sw_.committed_end_lsn_;
|
||||
EXPECT_GT(before_flashback_max_lsn, after_flashback_max_lsn);
|
||||
EXPECT_GT(barrier_end_lsn, after_flashback_end_lsn);
|
||||
|
||||
// 4. submit logs, the leader must not commit logs by itself
|
||||
block_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
|
||||
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 50, id));
|
||||
const LSN curr_max_lsn = leader.palf_handle_impl_->sw_.get_max_lsn();
|
||||
sleep(5);
|
||||
EXPECT_GT(curr_max_lsn, leader.palf_handle_impl_->sw_.committed_end_lsn_);
|
||||
EXPECT_EQ(after_flashback_end_lsn.val_, leader.palf_handle_impl_->sw_.committed_end_lsn_.val_) \
|
||||
<< after_flashback_end_lsn.val_ << ", " << leader.palf_handle_impl_->sw_.committed_end_lsn_.val_;
|
||||
|
||||
// 5. clear env
|
||||
unblock_pcode(leader_idx, ObRpcPacketCode::OB_LOG_PUSH_RESP);
|
||||
}
|
||||
delete_paxos_group(id);
|
||||
PALF_LOG(INFO, "end test test_flashback_after_upgrading", K(id));
|
||||
}
|
||||
|
||||
} // end unittest
|
||||
} // end oceanbase
|
||||
|
||||
// Notes: How to write a new module integrate test case in logservice?
|
||||
// 1. cp test_ob_simple_log_basic_func.cpp test_ob_simple_log_xxx.cpp
|
||||
// 2. modify const string TEST_NAME, class name and log file name in test_ob_simple_log_xxx.cpp
|
||||
// 3. add ob_unittest_clog() item and set label for test_ob_simple_log_xxx in unittest/cluster/CMakeFiles.txt
|
||||
// 4. write new TEST_F
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
|
||||
}
|
@ -444,15 +444,26 @@ int LogConfigMgr::get_log_sync_member_list_for_generate_committed_lsn(
|
||||
int ret = OB_SUCCESS;
|
||||
LSN prev_committed_end_lsn;
|
||||
sw_->get_committed_end_lsn(prev_committed_end_lsn);
|
||||
const int64_t prev_mode_pid = mode_mgr_->get_last_submit_mode_meta().proposal_id_;
|
||||
is_before_barrier = false;
|
||||
barrier_lsn = LSN(PALF_INITIAL_LSN_VAL);
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
PALF_LOG(WARN, "LogConfigMgr not init", KR(ret));
|
||||
} else if (OB_UNLIKELY(prev_committed_end_lsn < reconfig_barrier_.prev_end_lsn_ &&
|
||||
reconfig_barrier_.prev_end_lsn_.is_valid())) {
|
||||
reconfig_barrier_.prev_end_lsn_.is_valid() &&
|
||||
prev_mode_pid == reconfig_barrier_.prev_mode_pid_)) {
|
||||
is_before_barrier = true;
|
||||
barrier_lsn = reconfig_barrier_.prev_end_lsn_;
|
||||
// Scenario: 2F1A
|
||||
// 1. A reconfiguration (upgrade B) has been executed successfully with log_barrier 100
|
||||
// 2. the palf group is flashed back to 50, but reconfig_barrier in LogConfigMgr is still 100
|
||||
// 3. change to APPEND mode
|
||||
// 4. the leader may commit logs in (50, 100) by prev_member_list(A)
|
||||
// Note: to address above issue, we check mode_proposal_id. The previous memberlist will
|
||||
// be used only when the reconfir_barrier_.prev_mode_pid_ is equal to current mode
|
||||
// proposal_id. That means access mode hasn’t been changed (PALF hasn’t been flashed back)
|
||||
// since last reconfiguration.
|
||||
if (OB_FAIL(member_list.deep_copy(log_ms_meta_.prev_.config_.log_sync_memberlist_))) {
|
||||
PALF_LOG(WARN, "deep_copy member_list failed", KR(ret), K_(palf_id), K_(self));
|
||||
} else {
|
||||
|
@ -159,15 +159,15 @@ int LogModeMgr::get_ref_scn(int64_t &mode_version, SCN &ref_scn) const
|
||||
return ret;
|
||||
}
|
||||
|
||||
// require rlock of PalfHandleImpl
|
||||
LogModeMeta LogModeMgr::get_accepted_mode_meta() const
|
||||
{
|
||||
common::ObSpinLockGuard guard(lock_);
|
||||
return accepted_mode_meta_;
|
||||
}
|
||||
|
||||
// require rlock of PalfHandleImpl
|
||||
LogModeMeta LogModeMgr::get_last_submit_mode_meta() const
|
||||
{
|
||||
common::ObSpinLockGuard guard(lock_);
|
||||
return last_submit_mode_meta_;
|
||||
}
|
||||
|
||||
@ -674,6 +674,7 @@ int LogModeMgr::handle_prepare_response(const common::ObAddr &server,
|
||||
return ret;
|
||||
}
|
||||
|
||||
// require rlock of PalfHandleImpl
|
||||
bool LogModeMgr::can_receive_mode_meta(const int64_t proposal_id,
|
||||
const LogModeMeta &mode_meta,
|
||||
bool &has_accepted)
|
||||
@ -700,6 +701,7 @@ bool LogModeMgr::can_receive_mode_meta(const int64_t proposal_id,
|
||||
return bool_ret;
|
||||
}
|
||||
|
||||
// require wlock of PalfHandleImpl
|
||||
int LogModeMgr::receive_mode_meta(const common::ObAddr &server,
|
||||
const int64_t proposal_id,
|
||||
const bool is_applied_mode_meta,
|
||||
@ -748,7 +750,7 @@ int LogModeMgr::receive_mode_meta_(const common::ObAddr &server,
|
||||
return ret;
|
||||
}
|
||||
|
||||
// is_applied_mode_meta is true, caller should hold Wlock in PalfHandleImpl
|
||||
// require wlock of PalfHandleImpl
|
||||
int LogModeMgr::after_flush_mode_meta(const bool is_applied_mode_meta, const LogModeMeta &mode_meta)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -148,6 +148,7 @@ private:
|
||||
// NB: protected by SpinLock
|
||||
// log_mode_meta has been submitted to I/O Worker
|
||||
LogModeMeta last_submit_mode_meta_;
|
||||
// above LogModeMetas are protected by lock_ in PalfHandleImpl
|
||||
// =========access_mode changing state============
|
||||
// mode change state
|
||||
ModeChangeState state_;
|
||||
|
@ -2788,21 +2788,27 @@ int LogSlidingWindow::get_majority_lsn_(const ObMemberList &member_list,
|
||||
ObAddr tmp_server;
|
||||
LsnTsInfo tmp_val;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < member_list.get_member_number(); ++i) {
|
||||
int tmp_ret = OB_SUCCESS;
|
||||
tmp_server.reset();
|
||||
if (OB_FAIL(member_list.get_server_by_index(i, tmp_server))) {
|
||||
PALF_LOG(WARN, "get_server_by_index failed", K(ret), K_(palf_id), K_(self));
|
||||
} else if (OB_FAIL(match_lsn_map_.get(tmp_server, tmp_val))) {
|
||||
// 预期不应该失败,每次成员变更时同步更新保持map与member_list一致
|
||||
PALF_LOG(WARN, "match_lsn_map_ get failed", K(ret), K_(palf_id), K_(self), K(tmp_server));
|
||||
} else if (OB_TMP_FAIL(match_lsn_map_.get(tmp_server, tmp_val))) {
|
||||
// Note: the leader may generate committed_end_lsn based on previous member list,
|
||||
// members in member_list may do not exist in match_lsn_map. For example, removing D from
|
||||
// (ABCD), previous member_list is (ABCD) but D has been removed from match_lsn_map.
|
||||
// Therefore, we just skip members that do not exist in match_lsn_map.
|
||||
PALF_LOG(WARN, "match_lsn_map_ get failed", K(tmp_ret), K_(palf_id), K_(self), K(tmp_server));
|
||||
} else {
|
||||
valid_member_cnt++;
|
||||
lsn_array[i] = tmp_val.lsn_;
|
||||
lsn_array[valid_member_cnt++] = tmp_val.lsn_;
|
||||
PALF_LOG(TRACE, "current matched lsn", K_(palf_id), K_(self), "server:", tmp_server, "lsn:", tmp_val.lsn_);
|
||||
}
|
||||
}
|
||||
} while(0);
|
||||
|
||||
if (OB_SUCC(ret)) {
|
||||
if (valid_member_cnt < replica_num / 2 + 1) {
|
||||
PALF_LOG(WARN, "match_lsn_map do not reach majority", K(ret), K_(palf_id), K_(self),
|
||||
K(member_list), K(replica_num), K(valid_member_cnt));
|
||||
} else if (OB_SUCC(ret)) {
|
||||
std::sort(lsn_array, lsn_array + valid_member_cnt, LSNCompare());
|
||||
assert(replica_num / 2 < OB_MAX_MEMBER_NUMBER);
|
||||
result_lsn = lsn_array[replica_num / 2];
|
||||
|
@ -1060,6 +1060,7 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args,
|
||||
// adding D, and member_list of D if still empty, D cann't vote for any one, therefore no one
|
||||
// can be elected to be leader.
|
||||
if (is_add_member_list(args.type_)) {
|
||||
RLockGuard guard(lock_);
|
||||
(void) config_mgr_.pre_sync_config_log_and_mode_meta(args.server_, proposal_id);
|
||||
}
|
||||
// step 2: config change remote precheck
|
||||
@ -1089,6 +1090,7 @@ int PalfHandleImpl::one_stage_config_change_(const LogConfigChangeArgs &args,
|
||||
K(ret), K_(palf_id), K_(self), K(args));
|
||||
} else {
|
||||
if (false == added_member_has_new_version) {
|
||||
RLockGuard guard(lock_);
|
||||
(void) config_mgr_.pre_sync_config_log_and_mode_meta(args.server_, proposal_id);
|
||||
}
|
||||
ret = OB_SUCCESS;
|
||||
@ -2458,8 +2460,7 @@ int PalfHandleImpl::receive_mode_meta(const common::ObAddr &server,
|
||||
PALF_LOG(WARN, "invalid arguments", K(ret), KPC(this), K(server), K(proposal_id), K(mode_meta));
|
||||
} else if (OB_FAIL(try_update_proposal_id_(server, proposal_id))) {
|
||||
PALF_LOG(WARN, "try_update_proposal_id_ failed", KR(ret), KPC(this), K(server), K(proposal_id));
|
||||
} else if (false == is_applied_mode_meta && OB_SUCCESS != (lock_ret = lock_.rdlock())) {
|
||||
} else if (true == is_applied_mode_meta && OB_SUCCESS != (lock_ret = lock_.wrlock())) {
|
||||
} else if (OB_SUCCESS != (lock_ret = lock_.wrlock())) {
|
||||
} else if (false == mode_mgr_.can_receive_mode_meta(proposal_id, mode_meta, has_accepted)) {
|
||||
PALF_LOG(WARN, "can_receive_mode_meta failed", KR(ret), KPC(this), K(proposal_id), K(mode_meta));
|
||||
} else if (true == has_accepted) {
|
||||
@ -2477,11 +2478,7 @@ int PalfHandleImpl::receive_mode_meta(const common::ObAddr &server,
|
||||
PALF_LOG(INFO, "receive_mode_meta finish", KR(ret), KPC(this), K(server), K(proposal_id),
|
||||
K(is_applied_mode_meta), K(mode_meta));
|
||||
if (OB_SUCCESS == lock_ret) {
|
||||
if (is_applied_mode_meta) {
|
||||
lock_.wrunlock();
|
||||
} else {
|
||||
lock_.rdunlock();
|
||||
}
|
||||
lock_.wrunlock();
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
@ -3813,7 +3810,7 @@ int PalfHandleImpl::inner_after_flush_meta(const FlushMetaCbCtx &flush_meta_cb_c
|
||||
PALF_LOG(INFO, "inner_after_flush_meta", K(flush_meta_cb_ctx));
|
||||
if (IS_NOT_INIT) {
|
||||
ret = OB_NOT_INIT;
|
||||
} else if (MODE_META == flush_meta_cb_ctx.type_ && true == flush_meta_cb_ctx.is_applied_mode_meta_) {
|
||||
} else if (MODE_META == flush_meta_cb_ctx.type_) {
|
||||
WLockGuard guard(lock_);
|
||||
ret = after_flush_mode_meta_(flush_meta_cb_ctx.proposal_id_,
|
||||
flush_meta_cb_ctx.is_applied_mode_meta_,
|
||||
@ -3827,11 +3824,6 @@ int PalfHandleImpl::inner_after_flush_meta(const FlushMetaCbCtx &flush_meta_cb_c
|
||||
case CHANGE_CONFIG_META:
|
||||
ret = after_flush_config_change_meta_(flush_meta_cb_ctx.proposal_id_, flush_meta_cb_ctx.config_version_);
|
||||
break;
|
||||
case MODE_META:
|
||||
ret = after_flush_mode_meta_(flush_meta_cb_ctx.proposal_id_,
|
||||
flush_meta_cb_ctx.is_applied_mode_meta_,
|
||||
flush_meta_cb_ctx.log_mode_meta_);
|
||||
break;
|
||||
case SNAPSHOT_META:
|
||||
ret = after_flush_snapshot_meta_(flush_meta_cb_ctx.base_lsn_);
|
||||
break;
|
||||
|
Loading…
x
Reference in New Issue
Block a user