// owner: zjf225077 // owner group: log /** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #include "lib/ob_define.h" #include "lib/ob_errno.h" #include #include #include #include #define private public #define protected public #include "env/ob_simple_log_cluster_env.h" #undef private #undef protected #include "logservice/palf/log_reader_utils.h" #include "logservice/palf/log_define.h" #include "logservice/palf/log_group_entry_header.h" #include "logservice/palf/log_io_worker.h" #include "logservice/palf/lsn.h" #include const std::string TEST_NAME = "single_replica"; using namespace oceanbase::common; using namespace oceanbase; namespace oceanbase { using namespace logservice; namespace logservice { int ObLogService::start() { int ret = OB_SUCCESS; // palf_env has been started in log_server.init() if (OB_FAIL(apply_service_.start())) { CLOG_LOG(WARN, "failed to start apply_service_", K(ret)); } else if (OB_FAIL(replay_service_.start())) { CLOG_LOG(WARN, "failed to start replay_service_", K(ret)); } else if (OB_FAIL(role_change_service_.start())) { CLOG_LOG(WARN, "failed to start role_change_service_", K(ret)); } else if (OB_FAIL(cdc_service_.start())) { CLOG_LOG(WARN, "failed to start cdc_service_", K(ret)); } else if (OB_FAIL(restore_service_.start())) { CLOG_LOG(WARN, "failed to start restore_service_", K(ret)); #ifdef OB_BUILD_ARBITRATION } else if (OB_FAIL(arb_service_.start())) { CLOG_LOG(WARN, "failed to start arb_service_", K(ret)); #endif } else { is_running_ = true; FLOG_INFO("ObLogService is started"); } return ret; } } namespace unittest { class TestObSimpleLogClusterSingleReplica : public ObSimpleLogClusterTestEnv { public: TestObSimpleLogClusterSingleReplica() : ObSimpleLogClusterTestEnv() { int ret = init(); if (OB_SUCCESS != ret) { throw std::runtime_error("TestObSimpleLogClusterLogEngine init failed"); } } ~TestObSimpleLogClusterSingleReplica() { destroy(); } int init() { return OB_SUCCESS; } void destroy() {} int64_t id_; PalfHandleImplGuard leader_; }; int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1; int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1; std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME; bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false; bool ObSimpleLogClusterTestBase::need_shared_storage_ = false; constexpr int64_t timeout_ts_us = 3 * 1000 * 1000; int64_t log_entry_size = 2 * 1024 * 1024 + 16 * 1024; void read_padding_entry(PalfHandleImplGuard &leader, SCN padding_scn, LSN padding_log_lsn) { // 从padding group entry开始读取 { PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn, iterator)); EXPECT_EQ(OB_SUCCESS, iterator.next()); LogEntry padding_log_entry; LSN check_lsn; EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn)); EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); EXPECT_EQ(true, padding_log_entry.check_integrity()); EXPECT_EQ(padding_scn, padding_log_entry.get_scn()); } // 从padding log entry开始读取 { PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn+LogGroupEntryHeader::HEADER_SER_SIZE, iterator)); EXPECT_EQ(OB_SUCCESS, iterator.next()); LogEntry padding_log_entry; LSN check_lsn; EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn)); EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); EXPECT_EQ(true, padding_log_entry.check_integrity()); EXPECT_EQ(padding_scn, padding_log_entry.get_scn()); } } TEST_F(TestObSimpleLogClusterSingleReplica, delete_paxos_group) { update_server_log_disk(10*1024*1024*1024ul); update_disk_options(10*1024*1024*1024ul/palf::PALF_PHY_BLOCK_SIZE); SET_CASE_LOG_FILE(TEST_NAME, "delete_paxos_group"); const int64_t id = ATOMIC_AAF(&palf_id_, 1); PALF_LOG(INFO, "start test delete_paxos_group", K(id)); int64_t leader_idx = 0; { unittest::PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx)); } sleep(1); // EXPECT_EQ(OB_SUCCESS, delete_paxos_group(id)); // TODO by yunlong: check log sync PALF_LOG(INFO, "end test delete_paxos_group", K(id)); } TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback) { SET_CASE_LOG_FILE(TEST_NAME, "single_replica_flashback"); OB_LOGGER.set_log_level("INFO"); const int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; PALF_LOG(INFO, "start test single replica flashback", K(id)); SCN max_scn; unittest::PalfHandleImplGuard leader; int64_t mode_version = INVALID_PROPOSAL_ID; SCN ref_scn; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); { SCN tmp_scn; LSN tmp_lsn; // 提交1条日志后进行flashback EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 100)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); tmp_scn = leader.palf_handle_impl_->get_max_scn(); switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 10), timeout_ts_us)); // 预期日志起点为LSN(0) EXPECT_EQ(LSN(0), leader.palf_handle_impl_->get_max_lsn()); EXPECT_EQ(SCN::minus(tmp_scn, 10), leader.palf_handle_impl_->get_max_scn()); EXPECT_EQ(LSN(0), leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_); // flashback到PADDING日志 switch_flashback_to_append(leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); EXPECT_EQ(OB_ITER_END, read_log(leader)); EXPECT_GT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn()); int remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.palf_handle_impl_->sw_.get_max_lsn(); EXPECT_LT(remained_log_size, log_entry_size); int need_log_size = remained_log_size - 5*1024; PALF_LOG(INFO, "runlin trace print sw1", K(leader.palf_handle_impl_->sw_)); // 保证末尾只剩小于1KB的空间 EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, need_log_size)); PALF_LOG(INFO, "runlin trace print sw2", K(leader.palf_handle_impl_->sw_)); SCN mid_scn; LogEntryHeader header; // 此时一共存在32条日志 EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header)); EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header)); EXPECT_GT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn()); remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.palf_handle_impl_->sw_.get_max_lsn(); EXPECT_LT(remained_log_size, 5*1024); EXPECT_GT(remained_log_size, 0); // 写一条大小为5KB的日志 LSN padding_log_lsn = leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 5*1024)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn())); // 验证读取padding是否成功 { share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn(); padding_scn = padding_scn.minus(padding_scn, 1); read_padding_entry(leader, padding_scn, padding_log_lsn); } PALF_LOG(INFO, "runlin trace print sw3", K(leader.palf_handle_impl_->sw_)); // Padding日志占用日志条数,因此存在34条日志 EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header)); EXPECT_EQ(OB_SUCCESS, get_middle_scn(34, leader, mid_scn, header)); EXPECT_EQ(OB_ITER_END, get_middle_scn(35, leader, mid_scn, header)); EXPECT_LT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn()); max_scn = leader.palf_handle_impl_->sw_.get_max_scn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); switch_append_to_flashback(leader, mode_version); // flashback到padding日志尾部 tmp_scn = leader.palf_handle_impl_->get_max_scn(); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 1), timeout_ts_us)); PALF_LOG(INFO, "flashback to padding tail"); EXPECT_EQ(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE)); EXPECT_EQ(OB_ITER_END, read_log(leader)); // flashback后存在33条日志(包含padding日志) EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header)); EXPECT_EQ(OB_ITER_END, get_middle_scn(34, leader, mid_scn, header)); // 验证读取padding是否成功 { share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn(); padding_scn.minus(padding_scn, 1); PALF_LOG(INFO, "begin read_padding_entry", K(padding_scn), K(padding_log_lsn)); read_padding_entry(leader, padding_scn, padding_log_lsn); } // flashback到padding日志头部,磁盘上还有32条日志 tmp_scn = leader.palf_handle_impl_->get_max_scn(); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 1), timeout_ts_us)); EXPECT_LT(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE)); EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header)); EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header)); EXPECT_EQ(padding_log_lsn, leader.palf_handle_impl_->get_max_lsn()); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx, 1000)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(LSN(PALF_BLOCK_SIZE), leader)); EXPECT_EQ(OB_ITER_END, read_log(leader)); switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::min_scn(), timeout_ts_us)); EXPECT_EQ(LSN(0), leader.palf_handle_impl_->get_max_lsn()); switch_flashback_to_append(leader, mode_version); ref_scn.convert_for_tx(10000); EXPECT_EQ(OB_SUCCESS, submit_log(leader, ref_scn, tmp_lsn, tmp_scn)); LSN tmp_lsn1 = leader.palf_handle_impl_->get_max_lsn(); ref_scn.convert_for_tx(50000); EXPECT_EQ(OB_SUCCESS, submit_log(leader, ref_scn, tmp_lsn, tmp_scn)); sleep(1); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); switch_append_to_flashback(leader, mode_version); ref_scn.convert_for_tx(30000); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, ref_scn, timeout_ts_us)); // 验证重复的flashback任务 EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->inner_flashback(ref_scn)); EXPECT_EQ(tmp_lsn1, leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_); // 验证flashback时间戳比过小 ref_scn.convert_from_ts(1); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->inner_flashback(ref_scn)); EXPECT_GT(tmp_lsn1, leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_); CLOG_LOG(INFO, "runlin trace 3"); } switch_flashback_to_append(leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); // flashback到中间某条日志 // 1. 比较log_storage和日位点和滑动窗口是否相同 switch_append_to_flashback(leader, mode_version); LogEntryHeader header_origin; EXPECT_EQ(OB_SUCCESS, get_middle_scn(200, leader, max_scn, header_origin)); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); LogEntryHeader header_new; SCN new_scn; EXPECT_EQ(OB_SUCCESS, get_middle_scn(200, leader, new_scn, header_new)); EXPECT_EQ(new_scn, max_scn); EXPECT_EQ(header_origin.data_checksum_, header_origin.data_checksum_); switch_flashback_to_append(leader, mode_version); LSN new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_; EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->sw_.committed_end_lsn_); EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_); EXPECT_EQ(OB_ITER_END, read_log(leader)); // 验证flashback后能否继续提交日志 EXPECT_EQ(OB_SUCCESS, submit_log(leader, 500, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); // 再次执行flashback到上一次的flashback位点 switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->sw_.committed_end_lsn_); EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_); EXPECT_EQ(OB_ITER_END, read_log(leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 500, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); // 再次执行flashback到上一次的flashback后提交日志的某个时间点 EXPECT_EQ(OB_SUCCESS, get_middle_scn(634, leader, max_scn, header_origin)); switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); switch_flashback_to_append(leader, mode_version); new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_; EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_); EXPECT_EQ(OB_ITER_END, read_log(leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); PALF_LOG(INFO, "flashback to middle success"); // flashback到某个更大的时间点 max_scn = leader.palf_handle_impl_->get_end_scn(); new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_; switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::plus(max_scn, 1000000000000), timeout_ts_us)); switch_flashback_to_append(leader, mode_version); new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_; EXPECT_EQ(new_log_tail.val_, leader.palf_handle_impl_->sw_.committed_end_lsn_.val_); EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_); EXPECT_EQ(OB_ITER_END, read_log(leader)); PALF_LOG(INFO, "flashback to greater success"); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); new_log_tail = leader.palf_handle_impl_->get_max_lsn(); max_scn = leader.palf_handle_impl_->get_max_scn(); PALF_LOG(INFO, "runlin trace 3", K(new_log_tail), K(max_scn)); switch_append_to_flashback(leader, mode_version); LSN new_log_tail_1 = leader.palf_handle_impl_->get_end_lsn(); SCN max_scn1 = leader.palf_handle_impl_->get_end_scn(); PALF_LOG(INFO, "runlin trace 4", K(new_log_tail), K(max_scn), K(new_log_tail_1), K(max_scn1)); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); LSN log_tail_after_flashback = leader.palf_handle_impl_->get_end_lsn(); SCN max_ts_after_flashback = leader.palf_handle_impl_->get_end_scn(); PALF_LOG(INFO, "runlin trace 5", K(log_tail_after_flashback), K(max_ts_after_flashback)); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->sw_.committed_end_lsn_); EXPECT_EQ(OB_ITER_END, read_log(leader)); PALF_LOG(INFO, "flashback to max_scn success"); // 再次执行flashback到提交日志前的max_scn EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); LSN curr_lsn = leader.palf_handle_impl_->get_end_lsn(); EXPECT_NE(curr_lsn, new_log_tail); EXPECT_EQ(OB_ITER_END, read_log(leader)); switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->get_end_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); // flashback reconfirming leader EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); SCN flashback_scn = leader.palf_handle_impl_->get_max_scn(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); switch_append_to_flashback(leader, mode_version); dynamic_cast(get_cluster()[0]->get_palf_env())->log_loop_thread_.stop(); dynamic_cast(get_cluster()[0]->get_palf_env())->log_loop_thread_.wait(); leader.palf_handle_impl_->state_mgr_.role_ = LEADER; leader.palf_handle_impl_->state_mgr_.state_ = RECONFIRM; EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); EXPECT_GT(leader.palf_handle_impl_->sw_.get_max_scn(), flashback_scn); leader.palf_handle_impl_->state_mgr_.role_ = FOLLOWER; leader.palf_handle_impl_->state_mgr_.state_ = ObReplicaState::ACTIVE; EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); EXPECT_LT(leader.palf_handle_impl_->sw_.get_max_scn(), flashback_scn); EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->get_end_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); leader.palf_handle_impl_->state_mgr_.role_ = LEADER; leader.palf_handle_impl_->state_mgr_.state_ = ObReplicaState::ACTIVE; dynamic_cast(get_cluster()[0]->get_palf_env())->log_loop_thread_.start(); switch_flashback_to_append(leader, mode_version); // 数据全部清空 wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::min_scn(), timeout_ts_us)); EXPECT_EQ(LSN(0), leader.palf_handle_impl_->get_max_lsn()); EXPECT_EQ(SCN::min_scn(), leader.palf_handle_impl_->get_max_scn()); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(OB_ITER_END, read_log(leader)); PALF_LOG(INFO, "flashback to 0 success"); leader.reset(); delete_paxos_group(id); } TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart) { SET_CASE_LOG_FILE(TEST_NAME, "single_replica_flashback_restart"); OB_LOGGER.set_log_level("INFO"); const int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; SCN max_scn = SCN::min_scn(); SCN ref_scn; int64_t mode_version = INVALID_PROPOSAL_ID; { unittest::PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000)); LogEntryHeader header_origin; EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); EXPECT_EQ(OB_SUCCESS, get_middle_scn(323, leader, max_scn, header_origin)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 1000)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(leader)); switch_append_to_flashback(leader, mode_version); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); LogEntryHeader header_new; SCN new_scn; EXPECT_EQ(OB_SUCCESS, get_middle_scn(323, leader, new_scn, header_new)); EXPECT_EQ(new_scn, max_scn); EXPECT_EQ(header_origin.data_checksum_, header_new.data_checksum_); EXPECT_EQ(OB_ITER_END, get_middle_scn(324, leader, new_scn, header_new)); switch_flashback_to_append(leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000)); wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_SUCCESS, get_middle_scn(1323, leader, new_scn, header_new)); EXPECT_EQ(OB_ITER_END, get_middle_scn(1324, leader, new_scn, header_new)); EXPECT_EQ(OB_ITER_END, read_log(leader)); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); { // 验证重启场景 PalfHandleImplGuard new_leader; int64_t curr_mode_version = INVALID_PROPOSAL_ID; AccessMode curr_access_mode = AccessMode::INVALID_ACCESS_MODE; EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode)); EXPECT_EQ(curr_mode_version, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx, 1000)); wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); ref_scn.convert_for_tx(1000); LogEntryHeader header_new; LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_; block_id_t max_block_id = log_storage->block_mgr_.max_block_id_; EXPECT_EQ(OB_SUCCESS, get_middle_scn(1329, new_leader, max_scn, header_new)); // flashback跨文件场景重启 EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 33, leader_idx, MAX_LOG_BODY_SIZE)); wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_LE(max_block_id, log_storage->block_mgr_.max_block_id_); switch_append_to_flashback(new_leader, mode_version); EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us)); EXPECT_GE(max_block_id, log_storage->block_mgr_.max_block_id_); switch_flashback_to_append(new_leader, mode_version); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); { PalfHandleImplGuard new_leader; int64_t curr_mode_version = INVALID_PROPOSAL_ID; AccessMode curr_access_mode = AccessMode::INVALID_ACCESS_MODE; EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode)); // flashback到某个文件的尾部 EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 65, leader_idx, MAX_LOG_BODY_SIZE)); wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); switch_append_to_flashback(new_leader, mode_version); LSN lsn(PALF_BLOCK_SIZE); LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_; SCN block_end_scn; { PalfGroupBufferIterator iterator; auto get_file_end_lsn = [](){ return LSN(PALF_BLOCK_SIZE); }; EXPECT_EQ(OB_SUCCESS, iterator.init(LSN(0), get_file_end_lsn, log_storage)); LogGroupEntry entry; LSN lsn; while (OB_SUCCESS == iterator.next()) { EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, lsn)); } block_end_scn = entry.get_scn(); } EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, block_end_scn, timeout_ts_us)); EXPECT_EQ(lsn, log_storage->log_tail_); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); // 重启后继续提交日志 { PalfHandleImplGuard new_leader; EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); switch_flashback_to_append(new_leader, mode_version); EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); share::SCN padding_scn = new_leader.get_palf_handle_impl()->get_max_scn(); EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx)); wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); switch_append_to_flashback(new_leader, mode_version); // flashback到padding日志头后重启 EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us)); EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); new_leader.reset(); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); // 重启提交日志,不产生padding日志 { PalfHandleImplGuard new_leader; EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); LSN padding_start_lsn = new_leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); const int64_t remained_size = PALF_BLOCK_SIZE - lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE); EXPECT_GE(remained_size, 0); const int64_t group_entry_body_size = remained_size - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE; PALF_LOG(INFO, "runlin trace print remained_size", K(remained_size), K(group_entry_body_size)); switch_flashback_to_append(new_leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, leader_idx, group_entry_body_size)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn())); PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, new_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_start_lsn, iterator)); EXPECT_EQ(OB_SUCCESS, iterator.next()); LogEntry log_entry; LSN check_lsn; EXPECT_EQ(OB_SUCCESS, iterator.get_entry(log_entry, check_lsn)); EXPECT_EQ(check_lsn, padding_start_lsn + LogGroupEntryHeader::HEADER_SER_SIZE); EXPECT_EQ(false, log_entry.header_.is_padding_log_()); EXPECT_EQ(true, log_entry.check_integrity()); new_leader.reset(); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); // 重启后继续提交日志 { PalfHandleImplGuard new_leader; EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx)); EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE)); EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx, 1000)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn())); EXPECT_EQ(OB_ITER_END, read_log(new_leader)); } delete_paxos_group(id); } TEST_F(TestObSimpleLogClusterSingleReplica, test_truncate_failed) { SET_CASE_LOG_FILE(TEST_NAME, "test_truncate_failed"); int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'}; int64_t file_size = 0; { PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1000)); wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader); LSN max_lsn = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1000)); wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader); int64_t fd = leader.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.curr_writable_handler_.io_fd_.second_id_; block_id_t block_id = leader.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.curr_writable_block_id_; char *log_dir = leader.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.log_dir_; convert_to_normal_block(log_dir, block_id, block_path, OB_MAX_FILE_NAME_LENGTH); EXPECT_EQ(OB_ITER_END, read_log(leader)); PALF_LOG_RET(ERROR, OB_SUCCESS, "truncate pos", K(max_lsn)); EXPECT_EQ(0, ftruncate(fd, max_lsn.val_+MAX_INFO_BLOCK_SIZE)); FileDirectoryUtils::get_file_size(block_path, file_size); EXPECT_EQ(file_size, max_lsn.val_+MAX_INFO_BLOCK_SIZE); } PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());; FileDirectoryUtils::get_file_size(block_path, file_size); EXPECT_EQ(file_size, PALF_PHY_BLOCK_SIZE); get_leader(id, leader, leader_idx); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, id, MAX_LOG_BODY_SIZE)); wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader); EXPECT_EQ(OB_ITER_END, read_log(leader)); // 验证truncate文件尾后重启 EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->log_engine_.truncate(LSN(PALF_BLOCK_SIZE))); EXPECT_EQ(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_); leader.reset(); EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); } TEST_F(TestObSimpleLogClusterSingleReplica, test_meta) { SET_CASE_LOG_FILE(TEST_NAME, "test_meta"); int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; LSN upper_aligned_log_tail; { PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); sleep(1); // 测试meta文件刚好写满的重启场景 LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_; LogStorage *log_meta_storage = &log_engine->log_meta_storage_; LSN log_meta_tail = log_meta_storage->log_tail_; upper_aligned_log_tail.val_ = (lsn_2_block(log_meta_tail, PALF_META_BLOCK_SIZE) + 1) * PALF_META_BLOCK_SIZE; int64_t delta = upper_aligned_log_tail - log_meta_tail; int64_t delta_cnt = delta / MAX_META_ENTRY_SIZE; while (delta_cnt-- > 0) { log_engine->append_log_meta_(log_engine->log_meta_); } EXPECT_EQ(upper_aligned_log_tail, log_meta_storage->log_tail_); PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace before restart", K(upper_aligned_log_tail), KPC(log_meta_storage)); } EXPECT_EQ(OB_SUCCESS, restart_paxos_groups()); { PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx)); LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_; LogStorage *log_meta_storage = &log_engine->log_meta_storage_; LSN log_meta_tail = log_meta_storage->log_tail_; upper_aligned_log_tail.val_ = (lsn_2_block(log_meta_tail, PALF_META_BLOCK_SIZE) + 1) * PALF_META_BLOCK_SIZE; int64_t delta = upper_aligned_log_tail - log_meta_tail; int64_t delta_cnt = delta / MAX_META_ENTRY_SIZE; while (delta_cnt-- > 0) { log_engine->append_log_meta_(log_engine->log_meta_); } EXPECT_EQ(upper_aligned_log_tail, log_meta_storage->log_tail_); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, id, MAX_LOG_BODY_SIZE)); sleep(1); wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader); block_id_t min_block_id, max_block_id; EXPECT_EQ(OB_SUCCESS, log_meta_storage->get_block_id_range(min_block_id, max_block_id)); EXPECT_EQ(min_block_id, max_block_id); } } TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator) { SET_CASE_LOG_FILE(TEST_NAME, "test_iterator"); OB_LOGGER.set_log_level("TRACE"); const int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; int64_t mode_version_v = 1; int64_t *mode_version = &mode_version_v; LSN end_lsn_v = LSN(100000000); LSN *end_lsn = &end_lsn_v; { SCN max_scn_case1, max_scn_case2, max_scn_case3; PalfHandleImplGuard leader; PalfHandleImplGuard raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_; const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); int64_t count = 5; // 提交1024条日志,记录max_scn,用于后续next迭代验证,case1 for (int i = 0; i < count; i++) { EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 4*1024)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); } max_scn_case1 = palf_handle_impl->get_max_scn(); // 提交5条日志,case1成功后,执行case2 for (int i = 0; i < count; i++) { EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 4*1024)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); } max_scn_case2 = palf_handle_impl->get_max_scn(); // 提交5条日志, case3, 验证next(replayable_point_scn, &next_log_min_scn, &bool) std::vector lsns; std::vector logts; const int64_t log_size = 500; auto submit_log_private =[this](PalfHandleImplGuard &leader, const int64_t count, const int64_t id, const int64_t wanted_data_size, std::vector &lsn_array, std::vector &scn_array)-> int{ int ret = OB_SUCCESS; lsn_array.resize(count); scn_array.resize(count); for (int i = 0; i < count && OB_SUCC(ret); i++) { SCN ref_scn; ref_scn.convert_from_ts(ObTimeUtility::current_time() + 10000000); std::vector tmp_lsn_array; std::vector tmp_log_scn_array; if (OB_FAIL(submit_log_impl(leader, 1, id, wanted_data_size, ref_scn, tmp_lsn_array, tmp_log_scn_array))) { } else { lsn_array[i] = tmp_lsn_array[0]; scn_array[i] = tmp_log_scn_array[0]; wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader); CLOG_LOG(INFO, "submit_log_private success", K(i), "scn", tmp_log_scn_array[0], K(ref_scn)); } } return ret; }; EXPECT_EQ(OB_SUCCESS, submit_log_private(leader, count, id, log_size, lsns, logts)); max_scn_case3 = palf_handle_impl->get_max_scn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, palf_handle_impl->get_end_lsn())); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader)); PalfHandleImpl *raw_write_palf_handle_impl = raw_write_leader.palf_handle_impl_; EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_palf_handle_impl->get_end_lsn())); PalfBufferIterator iterator; auto get_file_end_lsn = [&end_lsn]() -> LSN { return *end_lsn; }; auto get_mode_version = [&mode_version, &mode_version_v]() -> int64_t { PALF_LOG(INFO, "runlin trace", K(*mode_version), K(mode_version_v)); return *mode_version; }; EXPECT_EQ(OB_SUCCESS, iterator.init(LSN(0), get_file_end_lsn, get_mode_version, &raw_write_palf_handle_impl->log_engine_.log_storage_)); EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case1)); count--; EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn())); // case0: 验证group iterator迭代日志功能 EXPECT_EQ(OB_ITER_END, read_group_log(raw_write_leader, LSN(0))); LSN curr_lsn = iterator.iterator_impl_.get_curr_read_lsn(); // case1: // - 验证mode_version变化后,cache是否清空 // - replayable_point_scn是否生效 // 当mode version发生变化时,预期cache应该清空 // raw模式下,当replayable_point_scn很小时,直接返回OB_ITER_END PALF_LOG(INFO, "runlin trace case1", K(mode_version_v), K(*mode_version), K(max_scn_case1)); // mode_version_v 为无效值时,预期不清空 mode_version_v = INVALID_PROPOSAL_ID; end_lsn_v = curr_lsn; EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.end_lsn_); EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.start_lsn_); EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn())); // mode_version_v 比inital_mode_version小,预期不清空 mode_version_v = -1; EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.end_lsn_); EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.start_lsn_); EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn())); // 合理的mode_version_v,清空cache mode_version_v = 100; end_lsn_v = curr_lsn; EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn())); // cache清空,依赖上一次next操作 EXPECT_EQ(curr_lsn, iterator.iterator_storage_.start_lsn_); EXPECT_EQ(curr_lsn, iterator.iterator_storage_.end_lsn_); PALF_LOG(INFO, "runlin trace", K(iterator), K(max_scn_case1), K(curr_lsn)); end_lsn_v = LSN(1000000000); // 当replayable_point_scn为max_log_ts,预期max_log_ts前的日志可以吐出5条日志 EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case1)); count--; while (count > 0) { EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case1)); count--; } EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case1)); // case2: next 功能是否正常 // 尝试读取后续的5条日志 count = 5; PALF_LOG(INFO, "runlin trace case2", K(iterator), K(max_scn_case2)); while (count > 0) { EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case2)); count--; } // 此时的curr_entry已经是第三次提交日志的第一条日志日志(first_log) // 由于该日志对应的时间戳比max_scn_case2大,因此不会吐出 // NB: 这里测试时,遇到过以下情况:case3的第一次 next后的EXPECT_EQ: // curr_entry变为first_log后,在后续的测试中,尝试把file_end_lsn设置到 // fisrt_log之前,然后出现了一种情况,此时调用next(fist_log_ts, next_log_min_scn)后, // next_log_min_scn被设置为first_scn+1,对外表现为:尽管存在first_log,但外部在 // 没有看到first_log之前就已经next_log_min_scn一定大于first_scn // // 实际上,这种情况是不会出现的,因为file_end_lsn不会回退的 EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case2)); //case3: next(replayable_point_scn, &next_log_min_scn) PALF_LOG(INFO, "runlin trace case3", K(iterator), K(max_scn_case3), K(end_lsn_v), K(max_scn_case2)); SCN first_scn = logts[0]; // 在使用next(replayable_point_scn, &next_log_min_scn)接口时 // 我们禁止使用LogEntry的头作为迭代器终点 LSN first_log_start_lsn = lsns[0] - sizeof(LogGroupEntryHeader); LSN first_log_end_lsn = lsns[0]+log_size+sizeof(LogEntryHeader); SCN next_log_min_scn; bool iterate_end_by_replayable_point = false; count = 5; // 模拟提前达到文件终点, 没有读过新日志,因此next_log_min_scn为prev_entry_scn_+1 end_lsn_v = first_log_start_lsn - 1; CLOG_LOG(INFO, "runlin trace 1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(max_scn_case2), K(first_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(SCN::plus(first_scn, 10000), next_log_min_scn, iterate_end_by_replayable_point)); // file_end_lsn尽管回退了,但curr_entry_已经没有被读取过, 因此next_log_min_scn依旧为first_scn EXPECT_EQ(SCN::plus(iterator.iterator_impl_.prev_entry_scn_, 1), next_log_min_scn); EXPECT_EQ(iterate_end_by_replayable_point, false); CLOG_LOG(INFO, "runlin trace 3.1", K(iterator), K(end_lsn_v), KPC(end_lsn)); EXPECT_EQ(first_log_start_lsn, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_)); // 读取一条日志成功,next_log_min_scn会被重置 // curr_entry为fisrt_log_ts对应的log end_lsn_v = first_log_end_lsn; CLOG_LOG(INFO, "runlin trace 2", K(iterator), K(end_lsn_v), KPC(end_lsn)); EXPECT_EQ(OB_SUCCESS, iterator.next(first_scn, next_log_min_scn, iterate_end_by_replayable_point)); count--; // iterator 返回成功,next_log_min_scn应该为OB_INVALID_TIMESTAMP EXPECT_EQ(next_log_min_scn.is_valid(), false); // iterator中的prev_entry_scn_被设置为first_scn EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, first_scn); CLOG_LOG(INFO, "runlin trace 3", K(iterator), K(end_lsn_v), KPC(end_lsn)); { // 模拟提前达到文件终点, 此时文件终点为file_log_end_lsn // 预期next_log_min_scn为first_scn对应的日志+1 SCN second_scn = logts[1]; EXPECT_EQ(OB_ITER_END, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point)); // iterator返回OB_ITER_END,next_log_min_scn为first_scn+1 EXPECT_EQ(next_log_min_scn, SCN::plus(first_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, false); CLOG_LOG(INFO, "runlin trace 3", K(iterator), K(end_lsn_v), KPC(end_lsn), K(first_scn), K(second_scn)); // 再次调用next,预期next_log_min_scn依旧为first_scn+1 EXPECT_EQ(OB_ITER_END, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point)); // iterator返回OB_ITER_END,next_log_min_scn为first_scn+1 EXPECT_EQ(next_log_min_scn, SCN::plus(first_scn, 1)); } CLOG_LOG(INFO, "runlin trace 4", K(iterator), K(end_lsn_v), KPC(end_lsn)); SCN prev_next_success_scn; // 模拟到达replayable_point_scn,此时文件终点为second log, 预期next_log_min_scn为replayable_point_scn+1 // 同时replayable_point_scn < 缓存的日志时间戳 { SCN second_scn = logts[1]; SCN replayable_point_scn = SCN::minus(second_scn, 1); end_lsn_v = lsns[1]+log_size+sizeof(LogEntryHeader); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); // iterator返回OB_ITER_END,next_log_min_scn为replayable_point_scn + 1 PALF_LOG(INFO, "runliun trace 4.1", K(replayable_point_scn), K(next_log_min_scn), K(iterator)); EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1)); // 再次调用next,预期next_log_min_scn还是replayable_point_scn+1 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); // iterator返回OB_ITER_END,next_log_min_scn为replayable_point_scn+1 EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(OB_SUCCESS, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point)); count--; EXPECT_EQ(next_log_min_scn.is_valid(), false); prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; EXPECT_EQ(prev_next_success_scn, second_scn); } // 模拟file end lsn不是group entry的终点 { // 设置终点为第三条日志LogEntry对应的起点 end_lsn_v = lsns[2]+10; // 设置时间戳为第三条日志 SCN third_scn = logts[2]; SCN replayable_point_scn = SCN::plus(third_scn, 10); CLOG_LOG(INFO, "runlin trace 5.1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn)); // 此时内存中缓存的日志为第三条日志, iterator读取过新日志,但该日志由于end_lsn的原因不可读(此时,由于日志非受控回放,因此curr_read_pos_会被递推56) // 因此next_log_min_scn会被设置为third_scn EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); CLOG_LOG(INFO, "runlin trace 5.1.1", K(iterator), K(next_log_min_scn), K(replayable_point_scn)); EXPECT_EQ(next_log_min_scn, third_scn); EXPECT_EQ(iterate_end_by_replayable_point, false); // 验证第三条日志由于受控回放无法吐出(replayable_point_scn回退是不可能出现的,为了测试故意模拟) replayable_point_scn = SCN::minus(third_scn, 4); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); // 由于replayable_point_scn与curr_entry_之间不可能有日志,同时replayable_point_scn 0) { EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case3, next_log_min_scn, iterate_end_by_replayable_point)); prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; EXPECT_EQ(false, next_log_min_scn.is_valid()); count--; } CLOG_LOG(INFO, "runlin trace 6.1", K(iterator), K(end_lsn_v), K(max_scn_case3)); // 磁盘上以及受控回放点之后没有可读日志,此时应该返回受控回放点+1 EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case3, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(max_scn_case3, 1), next_log_min_scn); EXPECT_EQ(max_scn_case3, prev_next_success_scn); CLOG_LOG(INFO, "runlin trace 6.2", K(iterator), K(end_lsn_v), K(max_scn_case3), "end_lsn_of_leader", raw_write_leader.palf_handle_impl_->get_max_lsn()); // raw write 变为 Append后,在写入一些日志 // 测试raw write变apend后,迭代日志是否正常 { std::vector logts_append; std::vector lsns_append; int count_append = 5; EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(raw_write_leader)); PALF_LOG(INFO, "runlin trace 6.3", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(), "new_leader_lsn", leader.palf_handle_impl_->get_max_lsn()); EXPECT_EQ(OB_SUCCESS, submit_log_private(leader, count_append, id, log_size, lsns_append, logts_append)); EXPECT_EQ(OB_SUCCESS, submit_log_private(raw_write_leader, count_append, id, log_size, lsns_append, logts_append)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(raw_write_leader.palf_handle_impl_->get_max_lsn(), raw_write_leader)); PALF_LOG(INFO, "runlin trace 6.4", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(), "new_leader_lsn", leader.palf_handle_impl_->get_max_lsn()); // case 7 end_lsn_v 为很大的值之后,让内存中有2M数据, 预期iterator next会由于受控回放失败,prev_entry_scn_不变 // replayable_point_scn 为第一条日志的时间戳-2, next_log_min_scn 为append第一条LogEntry的时间戳 // NB: 如果不将数据读到内存中来,可能会出现读数据报错OB_NEED_RETRY的问题。 end_lsn_v = LSN(1000000000); SCN replayable_point_scn = SCN::minus(logts_append[0], 2); EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); count_append--; prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; end_lsn_v = lsns_append[1]+2; // 此时curr_entry_为第二条日志, curr_entry有效但由于file end lsn不可读 // 对于append 日志受控回放无效 replayable_point_scn = SCN::plus(raw_write_leader.palf_handle_impl_->get_max_scn(), 1000000); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); PALF_LOG(INFO, "runlin trace 7.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(replayable_point_scn)); EXPECT_EQ(next_log_min_scn, logts_append[1]); EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_); EXPECT_EQ(iterate_end_by_replayable_point, false); PALF_LOG(INFO, "runlin trace 7.1.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1])); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn, logts_append[1]); EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_); // replayable_point_scn回退是一个不可能出现的情况, 但从iterator视角不能依赖这个 // 验证replayable_point_scn回退到一个很小的值,预期next_log_min_scn为prev_next_success_scn+1 // 模拟replayable_point_scn小于prev_entry_ replayable_point_scn.convert_for_tx(100); PALF_LOG(INFO, "runlin trace 7.2", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0])); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_); EXPECT_EQ(iterate_end_by_replayable_point, false); // 在迭代一次,结果一样 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_); // 验证replayable_point_scn的值为prev_next_success_scn和第二条append的日志之间, // 预期next_log_min_scn为replayable_point_scn+1 // 模拟replayable_point_scn位于[prev_entry_, curr_entry_] replayable_point_scn = SCN::minus(logts_append[1], 4); PALF_LOG(INFO, "runlin trace 7.3", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1)); // 由于replayable_point_scn到curr_entry_之间没有日志,因此prev_entry_scn_会被推到replayable_point_scn EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_); // 在迭代一次 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1)); // 由于replayable_point_scn到curr_entry_之间没有日志,因此prev_entry_scn_会被推到replayable_point_scn EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_); // 验证迭代append日志成功, end_lsn_v = lsns_append[2]+2; replayable_point_scn = logts_append[0]; EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, next_log_min_scn.is_valid()); EXPECT_EQ(logts_append[1], iterator.iterator_impl_.prev_entry_scn_); count_append--; prev_next_success_scn = logts_append[1]; // replayable_point_scn比较大,预期next_log_min_scn为logts_append[2] replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000); PALF_LOG(INFO, "runlin trace 7.4", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[2]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn, logts_append[2]); // 在迭代一次,结果一样 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn, logts_append[2]); EXPECT_EQ(iterate_end_by_replayable_point, false); // 回退replayable_point_scn,预期next_log_min_scn为prev_next_success_scn+1 replayable_point_scn.convert_for_tx(100); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); EXPECT_EQ(iterate_end_by_replayable_point, false); end_lsn_v = LSN(1000000000); replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000); // 留一条日志 while (count_append > 1) { EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, next_log_min_scn.is_valid()); prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; count_append--; } // 验证append切回raw后是否正常工作 { int64_t id3 = ATOMIC_AAF(&palf_id_, 1); std::vector logts_append; std::vector lsns_append; int count_append = 5; PALF_LOG(INFO, "runlin trace 8.1.0", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(), "new_leader_lsn", leader.palf_handle_impl_->get_max_lsn()); EXPECT_EQ(OB_SUCCESS, submit_log_private(leader, count_append, id, log_size, lsns_append, logts_append)); SCN max_scn_case4 = leader.palf_handle_impl_->get_max_scn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); PALF_LOG(INFO, "runlin trace 8.1", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(), "new_leader_lsn", leader.palf_handle_impl_->get_max_lsn()); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(raw_write_leader.palf_handle_impl_->get_max_lsn(), raw_write_leader)); PALF_LOG(INFO, "runlin trace 8.2", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(), "new_leader_lsn", leader.palf_handle_impl_->get_max_lsn()); // replayable_point_scn偏小 SCN replayable_point_scn; replayable_point_scn.convert_for_tx(100); PALF_LOG(INFO, "runlin trace 8.3", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]), K(prev_next_success_scn)); // 迭代前一轮的日志,不需要递减count_append EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; // 由于受控回放点不可读, next_log_min_scn应该为prev_next_success_scn+1 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); PALF_LOG(INFO, "runlin trace 8.3.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); // 推大受控回放点到第一条日志,但end_lsn_v也变为第一条日志的起点,此时会由于end_lsn_v不可读 // 预期next_min_scn为replayable_point_scn. // 由于这条日志在此前的next中,不需要受控回放,会推大curr_read_pos_到LogEntry头,再次next不需要读数据直接返回OB_ITER_END end_lsn_v = lsns_append[0]+10; replayable_point_scn = logts_append[0]; PALF_LOG(INFO, "runlin trace 8.4", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(replayable_point_scn, next_log_min_scn); EXPECT_EQ(iterate_end_by_replayable_point, false); PALF_LOG(INFO, "runlin trace 8.4.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(replayable_point_scn, next_log_min_scn); EXPECT_EQ(iterate_end_by_replayable_point, false); PALF_LOG(INFO, "runlin trace 8.4.2", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]), K(prev_next_success_scn)); // 模拟prev_entry_后没有日志,replayable_point_scn小于prev_entry_scn_, 后续日志都需要受控回放 // replayable_point_scn回退是不会出现的事,此时next_min_scn会返回prev_entry_scn_+1 replayable_point_scn = SCN::minus(prev_next_success_scn, 100); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); // 模拟prev_entry后有日志 // 推大end_lsn_v到第二条日志的起点 end_lsn_v = lsns_append[1]+2; replayable_point_scn = logts_append[1]; PALF_LOG(INFO, "runlin trace 8.5", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(prev_next_success_scn)); EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn.is_valid(), false); prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; EXPECT_EQ(prev_next_success_scn, logts_append[0]); PALF_LOG(INFO, "runlin trace 8.6", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(prev_next_success_scn)); // 模拟prev_entry_后有日志, 但不可见的情况 // 此时会由于replayable_point_scn不吐出第二条日志 // 模拟replayable_point_scn在prev_entry_之前的情况 replayable_point_scn.convert_for_tx(100); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); // 模拟replayable_point_scn在prev_entry_之后的情况, 由于prev_enty_后有日志,因此 // prev_entry_到replayable_point_scn之间不可能有未读过的日志, // 因此next_log_min_scn为replayable_point_scn + 1. replayable_point_scn = SCN::plus(prev_next_success_scn , 2); PALF_LOG(INFO, "runlin trace 8.7", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(replayable_point_scn, 1), next_log_min_scn); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(replayable_point_scn, 1), next_log_min_scn); // 模拟replayable_point_scn在curr_entry之后的情况 replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000); PALF_LOG(INFO, "runlin trace 8.8", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(prev_next_success_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(logts_append[1], next_log_min_scn); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(logts_append[1], next_log_min_scn); end_lsn_v = LSN(1000000000); EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_log_min_scn.is_valid(), false); EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, logts_append[1]); prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_; // 验证受控回放 replayable_point_scn.convert_for_tx(100); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn); EXPECT_EQ(true, iterate_end_by_replayable_point); } } } // 验证重启 restart_paxos_groups(); { PalfHandleImplGuard raw_write_leader; PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, get_leader(id, raw_write_leader, leader_idx)); PalfHandleImpl *raw_write_palf_handle_impl = raw_write_leader.palf_handle_impl_; auto get_file_end_lsn = []() -> LSN { return LSN(1000000000); }; auto get_mode_version = [&mode_version, &mode_version_v]() -> int64_t { PALF_LOG(INFO, "runlin trace", K(*mode_version), K(mode_version_v)); return *mode_version; }; EXPECT_EQ(OB_SUCCESS, iterator.init(LSN(0), get_file_end_lsn, get_mode_version, &raw_write_palf_handle_impl->log_engine_.log_storage_)); SCN max_scn = raw_write_leader.palf_handle_impl_->get_max_scn(); int64_t count = 5 + 5 + 5 + 5 + 5; while (count > 0) { EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn)); count--; } EXPECT_EQ(OB_ITER_END, iterator.next(max_scn)); EXPECT_EQ(OB_ITER_END, read_log_from_memory(raw_write_leader)); } } TEST_F(TestObSimpleLogClusterSingleReplica, test_gc_block) { SET_CASE_LOG_FILE(TEST_NAME, "test_gc_block"); OB_LOGGER.set_log_level("TRACE"); int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; LSN upper_aligned_log_tail; PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_; LogStorage *log_meta_storage = &log_engine->log_meta_storage_; block_id_t min_block_id; share::SCN min_block_scn; EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); block_id_t expect_block_id = 1; share::SCN expect_scn; EXPECT_EQ(OB_SUCCESS, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn)); EXPECT_EQ(OB_SUCCESS, log_engine->get_block_min_scn(expect_block_id, expect_scn)); EXPECT_EQ(expect_scn, min_block_scn); EXPECT_EQ(OB_SUCCESS, log_engine->delete_block(0)); EXPECT_EQ(false, log_engine->min_block_max_scn_.is_valid()); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader)); expect_block_id = 2; EXPECT_EQ(OB_SUCCESS, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn)); EXPECT_EQ(OB_SUCCESS, log_engine->get_block_min_scn(expect_block_id, expect_scn)); EXPECT_EQ(expect_scn, min_block_scn); EXPECT_EQ(OB_SUCCESS, log_engine->delete_block(1)); expect_block_id = 3; EXPECT_EQ(OB_SUCCESS, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn)); EXPECT_EQ(OB_SUCCESS, log_engine->get_block_min_scn(expect_block_id, expect_scn)); EXPECT_EQ(expect_scn, min_block_scn); } TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback) { SET_CASE_LOG_FILE(TEST_NAME, "test_iterator_with_flashback"); OB_LOGGER.set_log_level("TRACE"); int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; PalfHandleImplGuard leader; PalfHandleImplGuard raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_; const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200)); SCN max_scn1 = leader.palf_handle_impl_->get_max_scn(); LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); SCN max_scn2 = leader.palf_handle_impl_->get_max_scn(); LSN end_pos_of_log2 = leader.palf_handle_impl_->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); // 提交几条日志到raw_write leader EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator)); // 迭代flashback之前的日志成功 SCN next_min_scn; SCN tmp_scn; tmp_scn.val_ = 1000; bool iterate_end_by_replayable_point = false; EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, iterate_end_by_replayable_point); EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn1); EXPECT_EQ(OB_ITER_END, iterator.next( max_scn1, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_)); EXPECT_EQ(SCN::plus(max_scn1, 1), next_min_scn); PALF_LOG(INFO, "runlin trace case1", K(iterator), K(end_pos_of_log1)); EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->inner_flashback(max_scn2)); EXPECT_EQ(max_scn2, raw_write_leader.palf_handle_impl_->get_max_scn()); int64_t mode_version; switch_flashback_to_append(raw_write_leader, mode_version); // 磁盘上存在三条日志,一条日志已经迭代,另外一条日志没有迭代(raw_write),最后一条日志为Append EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 1, leader_idx, 333)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); EXPECT_EQ(OB_ITER_END, read_group_log(raw_write_leader, LSN(0))); SCN max_scn3 = raw_write_leader.palf_handle_impl_->get_max_scn(); PALF_LOG(INFO, "runlin trace case2", K(iterator), K(max_scn3), "end_lsn:", raw_write_leader.palf_handle_impl_->get_end_lsn()); LSN iterator_end_lsn = iterator.iterator_storage_.end_lsn_; // iterator内存中有几条日志,预期返回成功, 此时会清cache, 前一条日志的信息会被清除(raw_write日志) // 迭代器游标预期依旧指向第一条日志的终点, 由于受控回放,返回iterate_end EXPECT_EQ(OB_ITER_END, iterator.next( max_scn1, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_)); EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_); // 需要从磁盘上将后面两日志读上来,但由于受控回放不会吐出 // EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_); EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn2, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, iterate_end_by_replayable_point); EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_); EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn2); EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, iterate_end_by_replayable_point); EXPECT_EQ(false, iterator.iterator_impl_.curr_entry_is_raw_write_); EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn3); // raw_write_leader已经有三条日志, raw_write(1 log entry), raw_write(1), append(1), // 模拟一条group entry 中有多条小日志 LSN last_lsn = raw_write_leader.palf_handle_impl_->get_max_lsn(); SCN last_scn = raw_write_leader.palf_handle_impl_->get_max_scn(); LogIOWorker *io_worker = raw_write_leader.palf_handle_impl_->log_engine_.log_io_worker_; IOTaskCond cond(id_raw_write, raw_write_leader.palf_env_impl_->last_palf_epoch_); io_worker->submit_io_task(&cond); std::vector lsns; std::vector scns; EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 10, 100, id_raw_write, lsns, scns)); int group_entry_num = 1; int first_log_entry_index = 0, last_log_entry_index = 0; for (int i = 1; i < 10; i++) { if (lsns[i-1]+100+sizeof(LogEntryHeader) == lsns[i]) { last_log_entry_index = i; } else { first_log_entry_index = i; group_entry_num++; PALF_LOG(INFO, "group entry", K(i-1)); } if (first_log_entry_index - last_log_entry_index > 2) { break; } } leader.reset(); if (first_log_entry_index != 1 && last_log_entry_index != 9) { PALF_LOG(INFO, "no group log has more than 2 log entry", K(first_log_entry_index), K(last_log_entry_index)); return; } cond.cond_.signal(); // 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前还有几条LogEntry // LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志) EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); { const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1); PalfHandleImplGuard new_raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader)); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader)); PalfBufferIterator buff_iterator; PalfGroupBufferIterator group_iterator; EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator)); EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator)); SCN replayable_point_scn(SCN::min_scn()); // 验证replayable_point_scn为min_scn EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); // replayable_point_scn为第一条日志-1 replayable_point_scn = SCN::minus(max_scn1, 1); EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); // replayable_point_scn为第一条日志 replayable_point_scn = max_scn1; EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, false); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, false); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); // replayable_point_scn为第一条日志 + 1 replayable_point_scn = SCN::plus(max_scn1, 1); EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); // 成功迭代第二条日志,第三条日志 replayable_point_scn = last_scn; EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); // 第四条日志一定是LogGroupEntry replayable_point_scn = scns[0]; EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); // 迭代第五条LogGroupEntry的第一条LogEntry EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); // 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_ EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_)); // 成功迭代第五条LogGroupEntry的第一条LogEntry replayable_point_scn = scns[1]; EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); // group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放 EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); // 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn EXPECT_EQ(next_min_scn, replayable_point_scn); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]); EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); // 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_ EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_)); // buff_iterator的游标到了第五条group_entry的第一条小日志 // grou_iterator的游标到了第五条group_entry开头 // sncs[0] 第四条group entry,scns[1] - scns[9]是第二条 // 第五条group entry的第五条小日志被flashback EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[4])); EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[4]); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader)); // 提交一条group_entry // 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append // 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn())); // 对于buff_iterator // lsns[2]为第二条小日志开头,即第一条小日志末尾 // 验证游标起始位置为第一条小日志头部 // next 返回iterate是否清空cache // 迭代raw_write写入的小日志 // 迭代append写入的小日志 PALF_LOG(INFO, "rulin trace 1", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator)); EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1)); EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_); // 迭代第二条日志 EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); // 迭代第三条日志 EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); // 迭代第四条日志 EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); // 迭代第五条日志(迭代新的GroupENtry, 非受控回放) EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn())); EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn())); // 对于group_iterator // 验证游标起始位置为raw_write日志开头 // next 返回iterate是否清空cache // 迭代raw_write写入的大日志 // 迭代append写入的大日志 PALF_LOG(INFO, "rulin trace 2", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator)); EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader)); EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1)); // 迭代raw_write日志 EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn())); EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn())); EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::max_scn())); } // 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前没有LogEntry // LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志) EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn())); { const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1); PalfHandleImplGuard new_raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader)); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader)); PalfBufferIterator buff_iterator; PalfGroupBufferIterator group_iterator; EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator)); EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator)); // 成功迭代第一条日志,第二条日志,第三条日志 SCN replayable_point_scn(last_scn); EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); // 第四条日志一定是LogGroupEntry replayable_point_scn = scns[0]; EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn)); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn)); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); // 迭代第五条LogGroupEntry的第一条LogEntry EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); // 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_ EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_)); // 成功迭代第五条LogGroupEntry的第一条LogEntry replayable_point_scn = scns[1]; EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn); EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]); // group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放 EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); // 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn EXPECT_EQ(next_min_scn, replayable_point_scn); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]); EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), lsns[1]); // 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_ EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn( group_iterator.iterator_impl_.curr_read_pos_), buff_iterator.iterator_impl_.log_storage_->get_lsn( buff_iterator.iterator_impl_.curr_read_pos_)); // 迭代日志发现需要受控回放 EXPECT_EQ(OB_ITER_END, buff_iterator.next(scns[1], next_min_scn, iterate_end_by_replayable_point)); // buff_iterator的游标到了第五条group_entry的第一条小日志末尾 // grou_iterator的游标到了第五条group_entry开头 // sncs[0] 第四条group entry,scns[1] - scns[9]是第二条 // 第五条group entry的第二条小日志被flashback EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[2])); EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[2]); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader)); // 提交一条group_entry // 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append // 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn())); // 对于buff_iterator // lsns[2]为第二条小日志开头,即第一条小日志末尾 // 验证游标起始位置为第一条小日志头部 // next 返回iterate是否清空cache // 迭代raw_write写入的小日志 // 迭代append写入的小日志 PALF_LOG(INFO, "rulin trace 3", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator)); EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[2]); EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1)); EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_); // 迭代第二条小日志 EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn())); // 迭代新写入的LogGroupEntry, 不需要受控回放 EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn())); EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn())); // 对于group_iterator // 验证游标起始位置为raw_write日志开头 // next 返回iterate是否清空cache // 迭代raw_write写入的大日志 // 迭代append写入的大日志 PALF_LOG(INFO, "rulin trace 4", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator)); EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader)); EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1)); // 迭代raw_write日志 EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn())); // 迭代新的GruopEntry EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::min_scn())); EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn())); } // 验证一条LogGroupEntry需要受控回放,buff iterator不能更新accumlate_checksum和curr_read_pos_ // LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志) // last_scn scns[0] scns[1]... { const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1); PalfHandleImplGuard new_raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader)); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader)); PalfBufferIterator iterator; EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator)); SCN replayable_point_scn(last_scn); EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(iterate_end_by_replayable_point, true); EXPECT_EQ(next_min_scn, SCN::plus(last_scn, 1)); replayable_point_scn = scns[0]; EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn)); // scns[1]对应的日志无法吐出 EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(next_min_scn, SCN::plus(scns[0], 1)); EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, scns[0]); // flashback到scns[0] EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[0])); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader)); EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn())); // scns[0]对应的日志为raw write, 被flashback了, iterator停在scns[0]的末尾 // 迭代新写入的日志成功 EXPECT_EQ(OB_SUCCESS, iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(OB_ITER_END, iterator.next(SCN::min_scn())); } // 验证一条padding LogGroupEntry需要受控回放 { const int64_t append_id = ATOMIC_AAF(&palf_id_, 1); PalfHandleImplGuard append_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(append_id, leader_idx, append_leader)); EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 31, leader_idx, log_entry_size)); const LSN padding_start_lsn = append_leader.get_palf_handle_impl()->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(append_leader, append_leader.get_palf_handle_impl()->get_max_lsn())); SCN padding_scn = append_leader.get_palf_handle_impl()->get_max_scn(); padding_scn = padding_scn.minus(padding_scn, 1); const int64_t raw_write_id = ATOMIC_AAF(&palf_id_, 1); PalfHandleImplGuard raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(raw_write_id, leader_idx, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(append_leader, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.get_palf_handle_impl()->get_max_lsn())); switch_append_to_flashback(raw_write_leader, mode_version); PalfBufferIterator buff_iterator; PalfGroupBufferIterator group_buff_iterator; PalfBufferIterator buff_iterator_padding_start; PalfGroupBufferIterator group_buff_iterator_padding_start; EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator)); EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator)); EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator_padding_start)); EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator_padding_start)); SCN next_min_scn; bool iterate_end_by_replayable_point = false; EXPECT_EQ(OB_ITER_END, buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(OB_ITER_END, group_buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(true, iterate_end_by_replayable_point); // 一共有33条日志,包括padding SCN replayable_point_scn = padding_scn.minus(padding_scn, 1); // 直到padding日志受控回放 int ret = OB_SUCCESS; while (OB_SUCC(buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { } ret = OB_SUCCESS; while (OB_SUCC(buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { } EXPECT_EQ(OB_ITER_END, ret); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(next_min_scn, padding_scn); ret = OB_SUCCESS; while (OB_SUCC(group_buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { } ret = OB_SUCCESS; while (OB_SUCC(group_buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) { } EXPECT_EQ(OB_ITER_END, ret); EXPECT_EQ(true, iterate_end_by_replayable_point); EXPECT_EQ(next_min_scn, padding_scn); EXPECT_EQ(false, buff_iterator.iterator_impl_.curr_entry_is_padding_); EXPECT_EQ(false, group_buff_iterator.iterator_impl_.curr_entry_is_padding_); // flashback到padding日志尾 EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn, timeout_ts_us)); EXPECT_EQ(OB_SUCCESS, buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); LogEntry padding_log_entry; LSN padding_log_lsn; EXPECT_EQ(OB_SUCCESS, buff_iterator.get_entry(padding_log_entry, padding_log_lsn)); EXPECT_EQ(true, padding_log_entry.check_integrity()); EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_()); EXPECT_EQ(padding_scn, padding_log_entry.header_.scn_); EXPECT_EQ(false, buff_iterator.iterator_impl_.padding_entry_scn_.is_valid()); EXPECT_EQ(OB_SUCCESS, group_buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); LogGroupEntry padding_group_entry; LSN padding_group_lsn; EXPECT_EQ(OB_SUCCESS, group_buff_iterator.get_entry(padding_group_entry, padding_group_lsn)); EXPECT_EQ(true, padding_group_entry.check_integrity()); EXPECT_EQ(true, padding_group_entry.header_.is_padding_log()); // 对于LogGruopEntry的iterator,在construct_padding_log_entry_后,不会重置padding状态 EXPECT_EQ(true, group_buff_iterator.iterator_impl_.padding_entry_scn_.is_valid()); EXPECT_EQ(padding_log_entry.header_.scn_, padding_group_entry.header_.max_scn_); // flashback到padding日志头 EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us)); // 预期是由于文件长度导致的OB_ITER_END EXPECT_EQ(OB_ITER_END, buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, iterate_end_by_replayable_point); EXPECT_GE(next_min_scn, buff_iterator_padding_start.iterator_impl_.prev_entry_scn_); EXPECT_EQ(OB_ITER_END, group_buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point)); EXPECT_EQ(false, iterate_end_by_replayable_point); EXPECT_GE(next_min_scn, group_buff_iterator_padding_start.iterator_impl_.prev_entry_scn_); switch_flashback_to_append(raw_write_leader, mode_version); EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 100, leader_idx, 1000)); EXPECT_EQ(OB_SUCCESS, buff_iterator_padding_start.next()); EXPECT_EQ(OB_SUCCESS, group_buff_iterator_padding_start.next()); } } TEST_F(TestObSimpleLogClusterSingleReplica, test_raw_read) { SET_CASE_LOG_FILE(TEST_NAME, "test_raw_read"); OB_LOGGER.set_log_level("TRACE"); int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; PalfHandleImplGuard leader; const int64_t read_buf_ptr_len = PALF_BLOCK_SIZE; char *read_buf_ptr = reinterpret_cast(mtl_malloc_align( LOG_DIO_ALIGN_SIZE, PALF_BLOCK_SIZE + 2 * LOG_DIO_ALIGN_SIZE, "mittest")); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); PalfOptions opts; PalfEnvImpl *palf_env_impl = dynamic_cast(get_cluster()[0]->get_palf_env()); ASSERT_NE(nullptr, palf_env_impl); palf_env_impl->get_options(opts); opts.enable_log_cache_ = true; palf_env_impl->update_options(opts); // 提交100条日志, 每条日志大小为30K. { char *read_buf = read_buf_ptr; int64_t nbytes = read_buf_ptr_len; int64_t out_read_size = 0; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 1000)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); const int64_t curr_real_size = leader.palf_handle_impl_->get_max_lsn() - LSN(PALF_INITIAL_LSN_VAL); const LSN invalid_lsn(1); char *invalid_read_buf = read_buf_ptr + 1; const int64_t invalid_nbytes = 1; // 非DIO对齐度 palf::LogIOContext io_ctx(palf::LogIOUser::META_INFO); EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read( invalid_lsn, invalid_read_buf, invalid_nbytes, out_read_size, io_ctx)); EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read( LSN(PALF_INITIAL_LSN_VAL), invalid_read_buf, invalid_nbytes, out_read_size, io_ctx)); EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read( invalid_lsn, read_buf, invalid_nbytes, out_read_size, io_ctx)); EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read( invalid_lsn, invalid_read_buf, nbytes, out_read_size, io_ctx)); EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read( LSN(PALF_INITIAL_LSN_VAL), read_buf, invalid_nbytes, out_read_size, io_ctx)); PALF_LOG(INFO, "raw read success"); // 读取成功 EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->raw_read(LSN(PALF_INITIAL_LSN_VAL), read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx)); EXPECT_LE(out_read_size, PALF_BLOCK_SIZE); EXPECT_EQ(out_read_size, curr_real_size); // 读取长度超过end_lsn PALF_LOG(INFO, "raw read return OB_ERR_OUT_OF_UPPER_BOUND"); LSN out_of_upper_bound(PALF_BLOCK_SIZE); EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND, leader.palf_handle_impl_->raw_read( out_of_upper_bound, read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx)); // 模拟生成2个文件 EXPECT_EQ(OB_SUCCESS, submit_log(leader, 40, leader_idx, MAX_LOG_BODY_SIZE)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); // 模拟跨文件读 PALF_LOG(INFO, "raw read cross file"); LSN curr_read_lsn(lower_align(PALF_BLOCK_SIZE/2, LOG_DIO_ALIGN_SIZE)); int64_t expected_read_size = LSN(PALF_BLOCK_SIZE) - curr_read_lsn; io_ctx.iterator_info_.allow_filling_cache_ = false; EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->raw_read( curr_read_lsn, read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx)); EXPECT_EQ(out_read_size, expected_read_size); //io_ctx.set_allow_filling_cache(true); //EXPECT_EQ(OB_BUF_NOT_ENOUGH, leader.palf_handle_impl_->raw_read( // curr_read_lsn, read_buf, expected_read_size, out_read_size, io_ctx)); EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->delete_block(0)); // 模拟lower_bound PALF_LOG(INFO, "raw read return OB_ERR_OUT_OF_LOWER_BOUND"); LSN out_of_lower_bound(PALF_INITIAL_LSN_VAL); EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, leader.palf_handle_impl_->raw_read(out_of_lower_bound, read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx)); if (NULL != read_buf_ptr) { mtl_free_align(read_buf_ptr); } } } TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak) { SET_CASE_LOG_FILE(TEST_NAME, "test_iow"); OB_LOGGER.set_log_level("INFO"); int64_t id = ATOMIC_AAF(&palf_id_, 1); int64_t leader_idx = 0; // case1: palf epoch has been changed during do_task { PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_; IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_; ObILogAllocator *allocator = palf_env_impl->get_log_allocator(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn(); IOTaskCond cond(id, leader.palf_env_impl_->last_palf_epoch_); EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&cond)); sleep(1); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, log_entry_size)); EXPECT_NE(0, allocator->flying_log_task_); EXPECT_NE(0, allocator->flying_meta_task_); leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; cond.cond_.signal(); PALF_LOG(INFO, "runlin trace submit log 1"); while (iow->queue_.size() > 0) { PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size()); sleep(1); } EXPECT_EQ(0, allocator->flying_log_task_); EXPECT_EQ(0, allocator->flying_meta_task_); } delete_paxos_group(id); // case2: palf epoch has been changed during after_consume { PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_; IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_; ObILogAllocator *allocator = palf_env_impl->get_log_allocator(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn(); IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_); EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond)); sleep(1); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, log_entry_size)); EXPECT_NE(0, allocator->flying_log_task_); EXPECT_NE(0, allocator->flying_meta_task_); leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; consume_cond.cond_.signal(); PALF_LOG(INFO, "runlin trace submit log 2"); IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_); EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify)); while (verify.count_ == 0) { PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size()); sleep(1); } EXPECT_EQ(0, allocator->flying_log_task_); EXPECT_EQ(0, allocator->flying_meta_task_); } delete_paxos_group(id); // case3: palf epoch has been changed during do_task when there is no io reduce { PalfHandleImplGuard leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_; IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_; bool need_stop = false; std::thread throttling_th([palf_env_impl, &need_stop](){ PalfEnvImpl *impl = dynamic_cast(palf_env_impl); while (!need_stop) { impl->log_io_worker_wrapper_.notify_need_writing_throttling(true); usleep(1000); } }); ObILogAllocator *allocator = palf_env_impl->get_log_allocator(); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn())); LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn(); // case2: palf epoch has been changed during after_consume IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_); EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond)); sleep(3); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, log_entry_size)); EXPECT_NE(0, allocator->flying_log_task_); EXPECT_NE(0, allocator->flying_meta_task_); leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size)); leader.get_palf_handle_impl()->log_engine_.palf_epoch_++; consume_cond.cond_.signal(); PALF_LOG(INFO, "runlin trace submit log 3"); IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_); EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify)); while (verify.count_ == 0) { PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size()); sleep(1); } EXPECT_EQ(0, allocator->flying_log_task_); EXPECT_EQ(0, allocator->flying_meta_task_); need_stop = true; throttling_th.join(); } } TEST_F(TestObSimpleLogClusterSingleReplica, test_log_service_interface) { SET_CASE_LOG_FILE(TEST_NAME, "test_log_service_interface"); int64_t id = ATOMIC_AAF(&palf_id_, 1); ObSimpleLogServer *log_server = dynamic_cast(get_cluster()[0]); ASSERT_NE(nullptr, log_server); ObLogService *log_service = &log_server->log_service_; ObTenantRole tenant_role; tenant_role.value_ = ObTenantRole::Role::PRIMARY_TENANT; PalfBaseInfo palf_base_info; palf_base_info.generate_by_default(); ObLogHandler log_handler; ObLogRestoreHandler restore_handler; ObLogApplyService *apply_service = &log_service->apply_service_; ObReplicaType replica_type; ObLSID ls_id(id); ObApplyStatus *apply_status = nullptr; ASSERT_NE(nullptr, apply_status = static_cast(mtl_malloc(sizeof(ObApplyStatus), "mittest"))); new (apply_status) ObApplyStatus(); apply_status->inc_ref(); EXPECT_EQ(OB_SUCCESS, log_service->start()); EXPECT_EQ(OB_SUCCESS, apply_service->apply_status_map_.insert(ls_id, apply_status)); apply_service->is_running_ = true; EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler)); bool is_exist = false; EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist)); EXPECT_EQ(is_exist, false); EXPECT_EQ(OB_ENTRY_NOT_EXIST, apply_service->apply_status_map_.erase(ls_id)); EXPECT_EQ(OB_SUCCESS, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler)); EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler)); EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist)); EXPECT_EQ(is_exist, true); const char *log_dir = log_service->palf_env_->palf_env_impl_.log_dir_; bool result = false; EXPECT_EQ(OB_SUCCESS, FileDirectoryUtils::is_empty_directory(log_dir, result)); EXPECT_EQ(false, result); EXPECT_EQ(OB_SUCCESS, log_service->remove_ls(ls_id, log_handler, restore_handler)); EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist)); } TEST_F(TestObSimpleLogClusterSingleReplica, test_raw_write_concurrent_lsn) { SET_CASE_LOG_FILE(TEST_NAME, "test_raw_write_concurrent_lsn"); int64_t id = ATOMIC_AAF(&palf_id_, 1); OB_LOGGER.set_log_level("TRACE"); int64_t leader_idx = 0; PalfHandleImplGuard leader; PalfHandleImplGuard raw_write_leader; EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader)); PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_; const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1); EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader)); EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader)); EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, MAX_LOG_BASE_TYPE)); SCN max_scn1 = leader.palf_handle_impl_->get_max_scn(); LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn(); EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn())); ObSimpleLogServer *log_server = dynamic_cast(get_cluster()[leader_idx]); ASSERT_NE(nullptr, log_server); std::thread submit_log_t1([&]() { ObTenantEnv::set_tenant(log_server->get_tenant_base()); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader)); }); std::thread submit_log_t2([&]() { ObTenantEnv::set_tenant(log_server->get_tenant_base()); EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader)); }); submit_log_t1.join(); submit_log_t2.join(); } } // namespace unittest } // namespace oceanbase int main(int argc, char **argv) { RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME); }