Files
oceanbase/mittest/logservice/test_ob_simple_log_single_replica_func.cpp

2049 lines
111 KiB
C++

// owner: zjf225077
// owner group: log
/**
* Copyright (c) 2021 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include "lib/ob_define.h"
#include "lib/ob_errno.h"
#include <cstdio>
#include <gtest/gtest.h>
#include <signal.h>
#include <stdexcept>
#define private public
#define protected public
#include "env/ob_simple_log_cluster_env.h"
#undef private
#undef protected
#include "logservice/palf/log_reader_utils.h"
#include "logservice/palf/log_define.h"
#include "logservice/palf/log_group_entry_header.h"
#include "logservice/palf/log_io_worker.h"
#include "logservice/palf/lsn.h"
#include <thread>
const std::string TEST_NAME = "single_replica";
using namespace oceanbase::common;
using namespace oceanbase;
namespace oceanbase
{
using namespace logservice;
namespace logservice
{
int ObLogService::start()
{
int ret = OB_SUCCESS;
// palf_env has been started in log_server.init()
if (OB_FAIL(apply_service_.start())) {
CLOG_LOG(WARN, "failed to start apply_service_", K(ret));
} else if (OB_FAIL(replay_service_.start())) {
CLOG_LOG(WARN, "failed to start replay_service_", K(ret));
} else if (OB_FAIL(role_change_service_.start())) {
CLOG_LOG(WARN, "failed to start role_change_service_", K(ret));
} else if (OB_FAIL(cdc_service_.start())) {
CLOG_LOG(WARN, "failed to start cdc_service_", K(ret));
} else if (OB_FAIL(restore_service_.start())) {
CLOG_LOG(WARN, "failed to start restore_service_", K(ret));
#ifdef OB_BUILD_ARBITRATION
} else if (OB_FAIL(arb_service_.start())) {
CLOG_LOG(WARN, "failed to start arb_service_", K(ret));
#endif
} else {
is_running_ = true;
FLOG_INFO("ObLogService is started");
}
return ret;
}
}
namespace unittest
{
class TestObSimpleLogClusterSingleReplica : public ObSimpleLogClusterTestEnv
{
public:
TestObSimpleLogClusterSingleReplica() : ObSimpleLogClusterTestEnv()
{
int ret = init();
if (OB_SUCCESS != ret) {
throw std::runtime_error("TestObSimpleLogClusterLogEngine init failed");
}
}
~TestObSimpleLogClusterSingleReplica()
{
destroy();
}
int init()
{
return OB_SUCCESS;
}
void destroy()
{}
int64_t id_;
PalfHandleImplGuard leader_;
};
int64_t ObSimpleLogClusterTestBase::member_cnt_ = 1;
int64_t ObSimpleLogClusterTestBase::node_cnt_ = 1;
std::string ObSimpleLogClusterTestBase::test_name_ = TEST_NAME;
bool ObSimpleLogClusterTestBase::need_add_arb_server_ = false;
bool ObSimpleLogClusterTestBase::need_shared_storage_ = false;
constexpr int64_t timeout_ts_us = 3 * 1000 * 1000;
int64_t log_entry_size = 2 * 1024 * 1024 + 16 * 1024;
void read_padding_entry(PalfHandleImplGuard &leader, SCN padding_scn, LSN padding_log_lsn)
{
// 从padding group entry开始读取
{
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
LogEntry padding_log_entry;
LSN check_lsn;
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn));
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(padding_scn, padding_log_entry.get_scn());
}
// 从padding log entry开始读取
{
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_log_lsn+LogGroupEntryHeader::HEADER_SER_SIZE, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
LogEntry padding_log_entry;
LSN check_lsn;
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(padding_log_entry, check_lsn));
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(padding_scn, padding_log_entry.get_scn());
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, delete_paxos_group)
{
update_server_log_disk(10*1024*1024*1024ul);
update_disk_options(10*1024*1024*1024ul/palf::PALF_PHY_BLOCK_SIZE);
SET_CASE_LOG_FILE(TEST_NAME, "delete_paxos_group");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
PALF_LOG(INFO, "start test delete_paxos_group", K(id));
int64_t leader_idx = 0;
{
unittest::PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx));
}
sleep(1);
// EXPECT_EQ(OB_SUCCESS, delete_paxos_group(id));
// TODO by yunlong: check log sync
PALF_LOG(INFO, "end test delete_paxos_group", K(id));
}
TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback)
{
SET_CASE_LOG_FILE(TEST_NAME, "single_replica_flashback");
OB_LOGGER.set_log_level("INFO");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
PALF_LOG(INFO, "start test single replica flashback", K(id));
SCN max_scn;
unittest::PalfHandleImplGuard leader;
int64_t mode_version = INVALID_PROPOSAL_ID;
SCN ref_scn;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
{
SCN tmp_scn;
LSN tmp_lsn;
// 提交1条日志后进行flashback
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
tmp_scn = leader.palf_handle_impl_->get_max_scn();
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 10), timeout_ts_us));
// 预期日志起点为LSN(0)
EXPECT_EQ(LSN(0), leader.palf_handle_impl_->get_max_lsn());
EXPECT_EQ(SCN::minus(tmp_scn, 10), leader.palf_handle_impl_->get_max_scn());
EXPECT_EQ(LSN(0), leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
// flashback到PADDING日志
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
EXPECT_EQ(OB_ITER_END, read_log(leader));
EXPECT_GT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn());
int remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.palf_handle_impl_->sw_.get_max_lsn();
EXPECT_LT(remained_log_size, log_entry_size);
int need_log_size = remained_log_size - 5*1024;
PALF_LOG(INFO, "runlin trace print sw1", K(leader.palf_handle_impl_->sw_));
// 保证末尾只剩小于1KB的空间
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, need_log_size));
PALF_LOG(INFO, "runlin trace print sw2", K(leader.palf_handle_impl_->sw_));
SCN mid_scn;
LogEntryHeader header;
// 此时一共存在32条日志
EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header));
EXPECT_GT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn());
remained_log_size = LSN(PALF_BLOCK_SIZE) - leader.palf_handle_impl_->sw_.get_max_lsn();
EXPECT_LT(remained_log_size, 5*1024);
EXPECT_GT(remained_log_size, 0);
// 写一条大小为5KB的日志
LSN padding_log_lsn = leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 5*1024));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.get_palf_handle_impl()->get_max_lsn()));
// 验证读取padding是否成功
{
share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn();
padding_scn = padding_scn.minus(padding_scn, 1);
read_padding_entry(leader, padding_scn, padding_log_lsn);
}
PALF_LOG(INFO, "runlin trace print sw3", K(leader.palf_handle_impl_->sw_));
// Padding日志占用日志条数,因此存在34条日志
EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(34, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(35, leader, mid_scn, header));
EXPECT_LT(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->sw_.get_max_lsn());
max_scn = leader.palf_handle_impl_->sw_.get_max_scn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
switch_append_to_flashback(leader, mode_version);
// flashback到padding日志尾部
tmp_scn = leader.palf_handle_impl_->get_max_scn();
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 1), timeout_ts_us));
PALF_LOG(INFO, "flashback to padding tail");
EXPECT_EQ(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE));
EXPECT_EQ(OB_ITER_END, read_log(leader));
// flashback后存在33条日志(包含padding日志)
EXPECT_EQ(OB_SUCCESS, get_middle_scn(33, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(34, leader, mid_scn, header));
// 验证读取padding是否成功
{
share::SCN padding_scn = leader.get_palf_handle_impl()->get_max_scn();
padding_scn.minus(padding_scn, 1);
PALF_LOG(INFO, "begin read_padding_entry", K(padding_scn), K(padding_log_lsn));
read_padding_entry(leader, padding_scn, padding_log_lsn);
}
// flashback到padding日志头部,磁盘上还有32条日志
tmp_scn = leader.palf_handle_impl_->get_max_scn();
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::minus(tmp_scn, 1), timeout_ts_us));
EXPECT_LT(leader.palf_handle_impl_->get_max_lsn(), LSN(PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(32, leader, mid_scn, header));
EXPECT_EQ(OB_ITER_END, get_middle_scn(33, leader, mid_scn, header));
EXPECT_EQ(padding_log_lsn, leader.palf_handle_impl_->get_max_lsn());
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(LSN(PALF_BLOCK_SIZE), leader));
EXPECT_EQ(OB_ITER_END, read_log(leader));
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::min_scn(), timeout_ts_us));
EXPECT_EQ(LSN(0), leader.palf_handle_impl_->get_max_lsn());
switch_flashback_to_append(leader, mode_version);
ref_scn.convert_for_tx(10000);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, ref_scn, tmp_lsn, tmp_scn));
LSN tmp_lsn1 = leader.palf_handle_impl_->get_max_lsn();
ref_scn.convert_for_tx(50000);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, ref_scn, tmp_lsn, tmp_scn));
sleep(1);
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
switch_append_to_flashback(leader, mode_version);
ref_scn.convert_for_tx(30000);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, ref_scn, timeout_ts_us));
// 验证重复的flashback任务
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->inner_flashback(ref_scn));
EXPECT_EQ(tmp_lsn1, leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
// 验证flashback时间戳比过小
ref_scn.convert_from_ts(1);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->inner_flashback(ref_scn));
EXPECT_GT(tmp_lsn1, leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
CLOG_LOG(INFO, "runlin trace 3");
}
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
// flashback到中间某条日志
// 1. 比较log_storage和日位点和滑动窗口是否相同
switch_append_to_flashback(leader, mode_version);
LogEntryHeader header_origin;
EXPECT_EQ(OB_SUCCESS, get_middle_scn(200, leader, max_scn, header_origin));
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
LogEntryHeader header_new;
SCN new_scn;
EXPECT_EQ(OB_SUCCESS, get_middle_scn(200, leader, new_scn, header_new));
EXPECT_EQ(new_scn, max_scn);
EXPECT_EQ(header_origin.data_checksum_, header_origin.data_checksum_);
switch_flashback_to_append(leader, mode_version);
LSN new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->sw_.committed_end_lsn_);
EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_);
EXPECT_EQ(OB_ITER_END, read_log(leader));
// 验证flashback后能否继续提交日志
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 500, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
// 再次执行flashback到上一次的flashback位点
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->sw_.committed_end_lsn_);
EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_);
EXPECT_EQ(OB_ITER_END, read_log(leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 500, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
// 再次执行flashback到上一次的flashback后提交日志的某个时间点
EXPECT_EQ(OB_SUCCESS, get_middle_scn(634, leader, max_scn, header_origin));
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
switch_flashback_to_append(leader, mode_version);
new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_);
EXPECT_EQ(OB_ITER_END, read_log(leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
PALF_LOG(INFO, "flashback to middle success");
// flashback到某个更大的时间点
max_scn = leader.palf_handle_impl_->get_end_scn();
new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::plus(max_scn, 1000000000000), timeout_ts_us));
switch_flashback_to_append(leader, mode_version);
new_log_tail = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
EXPECT_EQ(new_log_tail.val_, leader.palf_handle_impl_->sw_.committed_end_lsn_.val_);
EXPECT_EQ(max_scn, leader.palf_handle_impl_->sw_.last_slide_scn_);
EXPECT_EQ(OB_ITER_END, read_log(leader));
PALF_LOG(INFO, "flashback to greater success");
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
new_log_tail = leader.palf_handle_impl_->get_max_lsn();
max_scn = leader.palf_handle_impl_->get_max_scn();
PALF_LOG(INFO, "runlin trace 3", K(new_log_tail), K(max_scn));
switch_append_to_flashback(leader, mode_version);
LSN new_log_tail_1 = leader.palf_handle_impl_->get_end_lsn();
SCN max_scn1 = leader.palf_handle_impl_->get_end_scn();
PALF_LOG(INFO, "runlin trace 4", K(new_log_tail), K(max_scn), K(new_log_tail_1), K(max_scn1));
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
LSN log_tail_after_flashback = leader.palf_handle_impl_->get_end_lsn();
SCN max_ts_after_flashback = leader.palf_handle_impl_->get_end_scn();
PALF_LOG(INFO, "runlin trace 5", K(log_tail_after_flashback), K(max_ts_after_flashback));
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->sw_.committed_end_lsn_);
EXPECT_EQ(OB_ITER_END, read_log(leader));
PALF_LOG(INFO, "flashback to max_scn success");
// 再次执行flashback到提交日志前的max_scn
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 300, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
LSN curr_lsn = leader.palf_handle_impl_->get_end_lsn();
EXPECT_NE(curr_lsn, new_log_tail);
EXPECT_EQ(OB_ITER_END, read_log(leader));
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->get_end_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
// flashback reconfirming leader
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
SCN flashback_scn = leader.palf_handle_impl_->get_max_scn();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
switch_append_to_flashback(leader, mode_version);
dynamic_cast<palf::PalfEnvImpl*>(get_cluster()[0]->get_palf_env())->log_loop_thread_.stop();
dynamic_cast<palf::PalfEnvImpl*>(get_cluster()[0]->get_palf_env())->log_loop_thread_.wait();
leader.palf_handle_impl_->state_mgr_.role_ = LEADER;
leader.palf_handle_impl_->state_mgr_.state_ = RECONFIRM;
EXPECT_EQ(OB_EAGAIN, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
EXPECT_GT(leader.palf_handle_impl_->sw_.get_max_scn(), flashback_scn);
leader.palf_handle_impl_->state_mgr_.role_ = FOLLOWER;
leader.palf_handle_impl_->state_mgr_.state_ = ObReplicaState::ACTIVE;
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
EXPECT_LT(leader.palf_handle_impl_->sw_.get_max_scn(), flashback_scn);
EXPECT_EQ(new_log_tail, leader.palf_handle_impl_->get_end_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
leader.palf_handle_impl_->state_mgr_.role_ = LEADER;
leader.palf_handle_impl_->state_mgr_.state_ = ObReplicaState::ACTIVE;
dynamic_cast<palf::PalfEnvImpl*>(get_cluster()[0]->get_palf_env())->log_loop_thread_.start();
switch_flashback_to_append(leader, mode_version);
// 数据全部清空
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, SCN::min_scn(), timeout_ts_us));
EXPECT_EQ(LSN(0), leader.palf_handle_impl_->get_max_lsn());
EXPECT_EQ(SCN::min_scn(), leader.palf_handle_impl_->get_max_scn());
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_ITER_END, read_log(leader));
PALF_LOG(INFO, "flashback to 0 success");
leader.reset();
delete_paxos_group(id);
}
TEST_F(TestObSimpleLogClusterSingleReplica, single_replica_flashback_restart)
{
SET_CASE_LOG_FILE(TEST_NAME, "single_replica_flashback_restart");
OB_LOGGER.set_log_level("INFO");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
SCN max_scn = SCN::min_scn();
SCN ref_scn;
int64_t mode_version = INVALID_PROPOSAL_ID;
{
unittest::PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000));
LogEntryHeader header_origin;
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
EXPECT_EQ(OB_SUCCESS, get_middle_scn(323, leader, max_scn, header_origin));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 1000));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(leader));
switch_append_to_flashback(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
LogEntryHeader header_new;
SCN new_scn;
EXPECT_EQ(OB_SUCCESS, get_middle_scn(323, leader, new_scn, header_new));
EXPECT_EQ(new_scn, max_scn);
EXPECT_EQ(header_origin.data_checksum_, header_new.data_checksum_);
EXPECT_EQ(OB_ITER_END, get_middle_scn(324, leader, new_scn, header_new));
switch_flashback_to_append(leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1000, leader_idx, 1000));
wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_SUCCESS, get_middle_scn(1323, leader, new_scn, header_new));
EXPECT_EQ(OB_ITER_END, get_middle_scn(1324, leader, new_scn, header_new));
EXPECT_EQ(OB_ITER_END, read_log(leader));
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
// 验证重启场景
PalfHandleImplGuard new_leader;
int64_t curr_mode_version = INVALID_PROPOSAL_ID;
AccessMode curr_access_mode = AccessMode::INVALID_ACCESS_MODE;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode));
EXPECT_EQ(curr_mode_version, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1000, leader_idx, 1000));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
ref_scn.convert_for_tx(1000);
LogEntryHeader header_new;
LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_;
block_id_t max_block_id = log_storage->block_mgr_.max_block_id_;
EXPECT_EQ(OB_SUCCESS, get_middle_scn(1329, new_leader, max_scn, header_new));
// flashback跨文件场景重启
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 33, leader_idx, MAX_LOG_BODY_SIZE));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_LE(max_block_id, log_storage->block_mgr_.max_block_id_);
switch_append_to_flashback(new_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, max_scn, timeout_ts_us));
EXPECT_GE(max_block_id, log_storage->block_mgr_.max_block_id_);
switch_flashback_to_append(new_leader, mode_version);
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard new_leader;
int64_t curr_mode_version = INVALID_PROPOSAL_ID;
AccessMode curr_access_mode = AccessMode::INVALID_ACCESS_MODE;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->get_access_mode(curr_mode_version, curr_access_mode));
// flashback到某个文件的尾部
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 65, leader_idx, MAX_LOG_BODY_SIZE));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
switch_append_to_flashback(new_leader, mode_version);
LSN lsn(PALF_BLOCK_SIZE);
LogStorage *log_storage = &new_leader.palf_handle_impl_->log_engine_.log_storage_;
SCN block_end_scn;
{
PalfGroupBufferIterator iterator;
auto get_file_end_lsn = [](){
return LSN(PALF_BLOCK_SIZE);
};
EXPECT_EQ(OB_SUCCESS, iterator.init(LSN(0), get_file_end_lsn, log_storage));
LogGroupEntry entry;
LSN lsn;
while (OB_SUCCESS == iterator.next()) {
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(entry, lsn));
}
block_end_scn = entry.get_scn();
}
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, block_end_scn, timeout_ts_us));
EXPECT_EQ(lsn, log_storage->log_tail_);
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启后继续提交日志
{
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
switch_flashback_to_append(new_leader, mode_version);
EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
share::SCN padding_scn = new_leader.get_palf_handle_impl()->get_max_scn();
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx));
wait_until_has_committed(new_leader, new_leader.palf_handle_impl_->sw_.get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
switch_append_to_flashback(new_leader, mode_version);
// flashback到padding日志头后重启
EXPECT_EQ(OB_SUCCESS, new_leader.palf_handle_impl_->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us));
EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
new_leader.reset();
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启提交日志,不产生padding日志
{
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
LSN padding_start_lsn = new_leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(true, 0 != lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
const int64_t remained_size = PALF_BLOCK_SIZE - lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE);
EXPECT_GE(remained_size, 0);
const int64_t group_entry_body_size = remained_size - LogGroupEntryHeader::HEADER_SER_SIZE - LogEntryHeader::HEADER_SER_SIZE;
PALF_LOG(INFO, "runlin trace print remained_size", K(remained_size), K(group_entry_body_size));
switch_flashback_to_append(new_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 1, leader_idx, group_entry_body_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn()));
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, new_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(padding_start_lsn, iterator));
EXPECT_EQ(OB_SUCCESS, iterator.next());
LogEntry log_entry;
LSN check_lsn;
EXPECT_EQ(OB_SUCCESS, iterator.get_entry(log_entry, check_lsn));
EXPECT_EQ(check_lsn, padding_start_lsn + LogGroupEntryHeader::HEADER_SER_SIZE);
EXPECT_EQ(false, log_entry.header_.is_padding_log_());
EXPECT_EQ(true, log_entry.check_integrity());
new_leader.reset();
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
// 重启后继续提交日志
{
PalfHandleImplGuard new_leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, new_leader, leader_idx));
EXPECT_EQ(true, 0 == lsn_2_offset(new_leader.get_palf_handle_impl()->get_max_lsn(), PALF_BLOCK_SIZE));
EXPECT_EQ(OB_SUCCESS, submit_log(new_leader, 100, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_leader, new_leader.get_palf_handle_impl()->get_max_lsn()));
EXPECT_EQ(OB_ITER_END, read_log(new_leader));
}
delete_paxos_group(id);
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_truncate_failed)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_truncate_failed");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
char block_path[OB_MAX_FILE_NAME_LENGTH] = {'\0'};
int64_t file_size = 0;
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1000));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
LSN max_lsn = leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, id, 1000));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
int64_t fd = leader.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.curr_writable_handler_.io_fd_.second_id_;
block_id_t block_id = leader.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.curr_writable_block_id_;
char *log_dir = leader.palf_handle_impl_->log_engine_.log_storage_.block_mgr_.log_dir_;
convert_to_normal_block(log_dir, block_id, block_path, OB_MAX_FILE_NAME_LENGTH);
EXPECT_EQ(OB_ITER_END, read_log(leader));
PALF_LOG_RET(ERROR, OB_SUCCESS, "truncate pos", K(max_lsn));
EXPECT_EQ(0, ftruncate(fd, max_lsn.val_+MAX_INFO_BLOCK_SIZE));
FileDirectoryUtils::get_file_size(block_path, file_size);
EXPECT_EQ(file_size, max_lsn.val_+MAX_INFO_BLOCK_SIZE);
}
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());;
FileDirectoryUtils::get_file_size(block_path, file_size);
EXPECT_EQ(file_size, PALF_PHY_BLOCK_SIZE);
get_leader(id, leader, leader_idx);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, id, MAX_LOG_BODY_SIZE));
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
EXPECT_EQ(OB_ITER_END, read_log(leader));
// 验证truncate文件尾后重启
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->log_engine_.truncate(LSN(PALF_BLOCK_SIZE)));
EXPECT_EQ(LSN(PALF_BLOCK_SIZE), leader.palf_handle_impl_->log_engine_.log_storage_.log_tail_);
leader.reset();
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_meta)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_meta");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
LSN upper_aligned_log_tail;
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
sleep(1);
// 测试meta文件刚好写满的重启场景
LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_;
LogStorage *log_meta_storage = &log_engine->log_meta_storage_;
LSN log_meta_tail = log_meta_storage->log_tail_;
upper_aligned_log_tail.val_ = (lsn_2_block(log_meta_tail, PALF_META_BLOCK_SIZE) + 1) * PALF_META_BLOCK_SIZE;
int64_t delta = upper_aligned_log_tail - log_meta_tail;
int64_t delta_cnt = delta / MAX_META_ENTRY_SIZE;
while (delta_cnt-- > 0) {
log_engine->append_log_meta_(log_engine->log_meta_);
}
EXPECT_EQ(upper_aligned_log_tail, log_meta_storage->log_tail_);
PALF_LOG_RET(ERROR, OB_SUCCESS, "runlin trace before restart", K(upper_aligned_log_tail), KPC(log_meta_storage));
}
EXPECT_EQ(OB_SUCCESS, restart_paxos_groups());
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, get_leader(id, leader, leader_idx));
LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_;
LogStorage *log_meta_storage = &log_engine->log_meta_storage_;
LSN log_meta_tail = log_meta_storage->log_tail_;
upper_aligned_log_tail.val_ = (lsn_2_block(log_meta_tail, PALF_META_BLOCK_SIZE) + 1) * PALF_META_BLOCK_SIZE;
int64_t delta = upper_aligned_log_tail - log_meta_tail;
int64_t delta_cnt = delta / MAX_META_ENTRY_SIZE;
while (delta_cnt-- > 0) {
log_engine->append_log_meta_(log_engine->log_meta_);
}
EXPECT_EQ(upper_aligned_log_tail, log_meta_storage->log_tail_);
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, id, MAX_LOG_BODY_SIZE));
sleep(1);
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
block_id_t min_block_id, max_block_id;
EXPECT_EQ(OB_SUCCESS, log_meta_storage->get_block_id_range(min_block_id, max_block_id));
EXPECT_EQ(min_block_id, max_block_id);
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_iterator");
OB_LOGGER.set_log_level("TRACE");
const int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
int64_t mode_version_v = 1;
int64_t *mode_version = &mode_version_v;
LSN end_lsn_v = LSN(100000000);
LSN *end_lsn = &end_lsn_v;
{
SCN max_scn_case1, max_scn_case2, max_scn_case3;
PalfHandleImplGuard leader;
PalfHandleImplGuard raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_;
const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
int64_t count = 5;
// 提交1024条日志,记录max_scn,用于后续next迭代验证,case1
for (int i = 0; i < count; i++) {
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 4*1024));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
}
max_scn_case1 = palf_handle_impl->get_max_scn();
// 提交5条日志,case1成功后,执行case2
for (int i = 0; i < count; i++) {
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, id, 4*1024));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
}
max_scn_case2 = palf_handle_impl->get_max_scn();
// 提交5条日志, case3, 验证next(replayable_point_scn, &next_log_min_scn, &bool)
std::vector<LSN> lsns;
std::vector<SCN> logts;
const int64_t log_size = 500;
auto submit_log_private =[this](PalfHandleImplGuard &leader,
const int64_t count,
const int64_t id,
const int64_t wanted_data_size,
std::vector<LSN> &lsn_array,
std::vector<SCN> &scn_array)-> int{
int ret = OB_SUCCESS;
lsn_array.resize(count);
scn_array.resize(count);
for (int i = 0; i < count && OB_SUCC(ret); i++) {
SCN ref_scn;
ref_scn.convert_from_ts(ObTimeUtility::current_time() + 10000000);
std::vector<LSN> tmp_lsn_array;
std::vector<SCN> tmp_log_scn_array;
if (OB_FAIL(submit_log_impl(leader, 1, id, wanted_data_size, ref_scn, tmp_lsn_array, tmp_log_scn_array))) {
} else {
lsn_array[i] = tmp_lsn_array[0];
scn_array[i] = tmp_log_scn_array[0];
wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader);
CLOG_LOG(INFO, "submit_log_private success", K(i), "scn", tmp_log_scn_array[0], K(ref_scn));
}
}
return ret;
};
EXPECT_EQ(OB_SUCCESS, submit_log_private(leader, count, id, log_size, lsns, logts));
max_scn_case3 = palf_handle_impl->get_max_scn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, palf_handle_impl->get_end_lsn()));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
PalfHandleImpl *raw_write_palf_handle_impl = raw_write_leader.palf_handle_impl_;
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_palf_handle_impl->get_end_lsn()));
PalfBufferIterator iterator;
auto get_file_end_lsn = [&end_lsn]() -> LSN { return *end_lsn; };
auto get_mode_version = [&mode_version, &mode_version_v]() -> int64_t {
PALF_LOG(INFO, "runlin trace", K(*mode_version), K(mode_version_v));
return *mode_version;
};
EXPECT_EQ(OB_SUCCESS,
iterator.init(LSN(0), get_file_end_lsn, get_mode_version, &raw_write_palf_handle_impl->log_engine_.log_storage_));
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case1)); count--;
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn()));
// case0: 验证group iterator迭代日志功能
EXPECT_EQ(OB_ITER_END, read_group_log(raw_write_leader, LSN(0)));
LSN curr_lsn = iterator.iterator_impl_.get_curr_read_lsn();
// case1:
// - 验证mode_version变化后,cache是否清空
// - replayable_point_scn是否生效
// 当mode version发生变化时,预期cache应该清空
// raw模式下,当replayable_point_scn很小时,直接返回OB_ITER_END
PALF_LOG(INFO, "runlin trace case1", K(mode_version_v), K(*mode_version), K(max_scn_case1));
// mode_version_v 为无效值时,预期不清空
mode_version_v = INVALID_PROPOSAL_ID;
end_lsn_v = curr_lsn;
EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.end_lsn_);
EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.start_lsn_);
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn()));
// mode_version_v 比inital_mode_version小,预期不清空
mode_version_v = -1;
EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.end_lsn_);
EXPECT_FALSE(curr_lsn == iterator.iterator_storage_.start_lsn_);
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn()));
// 合理的mode_version_v,清空cache
mode_version_v = 100;
end_lsn_v = curr_lsn;
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::base_scn()));
// cache清空,依赖上一次next操作
EXPECT_EQ(curr_lsn, iterator.iterator_storage_.start_lsn_);
EXPECT_EQ(curr_lsn, iterator.iterator_storage_.end_lsn_);
PALF_LOG(INFO, "runlin trace", K(iterator), K(max_scn_case1), K(curr_lsn));
end_lsn_v = LSN(1000000000);
// 当replayable_point_scn为max_log_ts,预期max_log_ts前的日志可以吐出5条日志
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case1)); count--;
while (count > 0) {
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case1));
count--;
}
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case1));
// case2: next 功能是否正常
// 尝试读取后续的5条日志
count = 5;
PALF_LOG(INFO, "runlin trace case2", K(iterator), K(max_scn_case2));
while (count > 0) {
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case2));
count--;
}
// 此时的curr_entry已经是第三次提交日志的第一条日志日志(first_log)
// 由于该日志对应的时间戳比max_scn_case2大,因此不会吐出
// NB: 这里测试时,遇到过以下情况:case3的第一次 next后的EXPECT_EQ:
// curr_entry变为first_log后,在后续的测试中,尝试把file_end_lsn设置到
// fisrt_log之前,然后出现了一种情况,此时调用next(fist_log_ts, next_log_min_scn)后,
// next_log_min_scn被设置为first_scn+1,对外表现为:尽管存在first_log,但外部在
// 没有看到first_log之前就已经next_log_min_scn一定大于first_scn
//
// 实际上,这种情况是不会出现的,因为file_end_lsn不会回退的
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case2));
//case3: next(replayable_point_scn, &next_log_min_scn)
PALF_LOG(INFO, "runlin trace case3", K(iterator), K(max_scn_case3), K(end_lsn_v), K(max_scn_case2));
SCN first_scn = logts[0];
// 在使用next(replayable_point_scn, &next_log_min_scn)接口时
// 我们禁止使用LogEntry的头作为迭代器终点
LSN first_log_start_lsn = lsns[0] - sizeof(LogGroupEntryHeader);
LSN first_log_end_lsn = lsns[0]+log_size+sizeof(LogEntryHeader);
SCN next_log_min_scn;
bool iterate_end_by_replayable_point = false;
count = 5;
// 模拟提前达到文件终点, 没有读过新日志,因此next_log_min_scn为prev_entry_scn_+1
end_lsn_v = first_log_start_lsn - 1;
CLOG_LOG(INFO, "runlin trace 1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(max_scn_case2), K(first_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::plus(first_scn, 10000), next_log_min_scn, iterate_end_by_replayable_point));
// file_end_lsn尽管回退了,但curr_entry_已经没有被读取过, 因此next_log_min_scn依旧为first_scn
EXPECT_EQ(SCN::plus(iterator.iterator_impl_.prev_entry_scn_, 1), next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
CLOG_LOG(INFO, "runlin trace 3.1", K(iterator), K(end_lsn_v), KPC(end_lsn));
EXPECT_EQ(first_log_start_lsn,
iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_));
// 读取一条日志成功,next_log_min_scn会被重置
// curr_entry为fisrt_log_ts对应的log
end_lsn_v = first_log_end_lsn;
CLOG_LOG(INFO, "runlin trace 2", K(iterator), K(end_lsn_v), KPC(end_lsn));
EXPECT_EQ(OB_SUCCESS, iterator.next(first_scn, next_log_min_scn, iterate_end_by_replayable_point)); count--;
// iterator 返回成功,next_log_min_scn应该为OB_INVALID_TIMESTAMP
EXPECT_EQ(next_log_min_scn.is_valid(), false);
// iterator中的prev_entry_scn_被设置为first_scn
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, first_scn);
CLOG_LOG(INFO, "runlin trace 3", K(iterator), K(end_lsn_v), KPC(end_lsn));
{
// 模拟提前达到文件终点, 此时文件终点为file_log_end_lsn
// 预期next_log_min_scn为first_scn对应的日志+1
SCN second_scn = logts[1];
EXPECT_EQ(OB_ITER_END, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point));
// iterator返回OB_ITER_END,next_log_min_scn为first_scn+1
EXPECT_EQ(next_log_min_scn, SCN::plus(first_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, false);
CLOG_LOG(INFO, "runlin trace 3", K(iterator), K(end_lsn_v), KPC(end_lsn), K(first_scn), K(second_scn));
// 再次调用next,预期next_log_min_scn依旧为first_scn+1
EXPECT_EQ(OB_ITER_END, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point));
// iterator返回OB_ITER_END,next_log_min_scn为first_scn+1
EXPECT_EQ(next_log_min_scn, SCN::plus(first_scn, 1));
}
CLOG_LOG(INFO, "runlin trace 4", K(iterator), K(end_lsn_v), KPC(end_lsn));
SCN prev_next_success_scn;
// 模拟到达replayable_point_scn,此时文件终点为second log, 预期next_log_min_scn为replayable_point_scn+1
// 同时replayable_point_scn < 缓存的日志时间戳
{
SCN second_scn = logts[1];
SCN replayable_point_scn = SCN::minus(second_scn, 1);
end_lsn_v = lsns[1]+log_size+sizeof(LogEntryHeader);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
// iterator返回OB_ITER_END,next_log_min_scn为replayable_point_scn + 1
PALF_LOG(INFO, "runliun trace 4.1", K(replayable_point_scn), K(next_log_min_scn),
K(iterator));
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
// 再次调用next,预期next_log_min_scn还是replayable_point_scn+1
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
// iterator返回OB_ITER_END,next_log_min_scn为replayable_point_scn+1
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(OB_SUCCESS, iterator.next(second_scn, next_log_min_scn, iterate_end_by_replayable_point)); count--;
EXPECT_EQ(next_log_min_scn.is_valid(), false);
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
EXPECT_EQ(prev_next_success_scn, second_scn);
}
// 模拟file end lsn不是group entry的终点
{
// 设置终点为第三条日志LogEntry对应的起点
end_lsn_v = lsns[2]+10;
// 设置时间戳为第三条日志
SCN third_scn = logts[2];
SCN replayable_point_scn = SCN::plus(third_scn, 10);
CLOG_LOG(INFO, "runlin trace 5.1", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn));
// 此时内存中缓存的日志为第三条日志, iterator读取过新日志,但该日志由于end_lsn的原因不可读(此时,由于日志非受控回放,因此curr_read_pos_会被递推56)
// 因此next_log_min_scn会被设置为third_scn
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
CLOG_LOG(INFO, "runlin trace 5.1.1", K(iterator), K(next_log_min_scn), K(replayable_point_scn));
EXPECT_EQ(next_log_min_scn, third_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
// 验证第三条日志由于受控回放无法吐出(replayable_point_scn回退是不可能出现的,为了测试故意模拟)
replayable_point_scn = SCN::minus(third_scn, 4);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
// 由于replayable_point_scn与curr_entry_之间不可能有日志,同时replayable_point_scn<curr_entry_,
// 由于prev_entry_scn_此时为第二条日志对应的时间戳,小于replayable_point_scn,因此
// next_min_scn会被设置为curr_entry_ scn和replayable_point_scn+1最小值,
// 因此prev_entry_scn_会推到到replayable_point_scn+1
// 同时由于prev_entry_scn_小于replayable_point_scn,同时replayable_point_scn和prev_entry_scn_之间没有日志
// 因此,推到prev_entry_scn_为replayable_point_scn_
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
prev_next_success_scn = replayable_point_scn;
EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_);
CLOG_LOG(INFO, "runlin trace 5.2", K(iterator), K(end_lsn_v), KPC(end_lsn), K(replayable_point_scn));
// 将replayable_point_scn变小,此时iterator会将next_min_scn设置为prev_next_success_scn + 1
replayable_point_scn = SCN::minus(replayable_point_scn, 2);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
}
end_lsn_v = LSN(1000000000);
while (count > 0) {
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn_case3, next_log_min_scn, iterate_end_by_replayable_point));
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
EXPECT_EQ(false, next_log_min_scn.is_valid());
count--;
}
CLOG_LOG(INFO, "runlin trace 6.1", K(iterator), K(end_lsn_v), K(max_scn_case3));
// 磁盘上以及受控回放点之后没有可读日志,此时应该返回受控回放点+1
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn_case3, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(max_scn_case3, 1), next_log_min_scn);
EXPECT_EQ(max_scn_case3, prev_next_success_scn);
CLOG_LOG(INFO, "runlin trace 6.2", K(iterator), K(end_lsn_v), K(max_scn_case3), "end_lsn_of_leader",
raw_write_leader.palf_handle_impl_->get_max_lsn());
// raw write 变为 Append后,在写入一些日志
// 测试raw write变apend后,迭代日志是否正常
{
std::vector<SCN> logts_append;
std::vector<LSN> lsns_append;
int count_append = 5;
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(raw_write_leader));
PALF_LOG(INFO, "runlin trace 6.3", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(),
"new_leader_lsn", leader.palf_handle_impl_->get_max_lsn());
EXPECT_EQ(OB_SUCCESS, submit_log_private(leader, count_append, id, log_size, lsns_append, logts_append));
EXPECT_EQ(OB_SUCCESS, submit_log_private(raw_write_leader, count_append, id, log_size, lsns_append, logts_append));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(raw_write_leader.palf_handle_impl_->get_max_lsn(), raw_write_leader));
PALF_LOG(INFO, "runlin trace 6.4", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(),
"new_leader_lsn", leader.palf_handle_impl_->get_max_lsn());
// case 7 end_lsn_v 为很大的值之后,让内存中有2M数据, 预期iterator next会由于受控回放失败,prev_entry_scn_不变
// replayable_point_scn 为第一条日志的时间戳-2, next_log_min_scn 为append第一条LogEntry的时间戳
// NB: 如果不将数据读到内存中来,可能会出现读数据报错OB_NEED_RETRY的问题。
end_lsn_v = LSN(1000000000);
SCN replayable_point_scn = SCN::minus(logts_append[0], 2);
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point)); count_append--;
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
end_lsn_v = lsns_append[1]+2;
// 此时curr_entry_为第二条日志, curr_entry有效但由于file end lsn不可读
// 对于append 日志受控回放无效
replayable_point_scn = SCN::plus(raw_write_leader.palf_handle_impl_->get_max_scn(), 1000000);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
PALF_LOG(INFO, "runlin trace 7.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]), K(replayable_point_scn));
EXPECT_EQ(next_log_min_scn, logts_append[1]);
EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_);
EXPECT_EQ(iterate_end_by_replayable_point, false);
PALF_LOG(INFO, "runlin trace 7.1.1", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[1]));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn, logts_append[1]);
EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_);
// replayable_point_scn回退是一个不可能出现的情况, 但从iterator视角不能依赖这个
// 验证replayable_point_scn回退到一个很小的值,预期next_log_min_scn为prev_next_success_scn+1
// 模拟replayable_point_scn小于prev_entry_
replayable_point_scn.convert_for_tx(100);
PALF_LOG(INFO, "runlin trace 7.2", K(iterator), K(replayable_point_scn), K(end_lsn_v), K(logts_append[0]));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_);
EXPECT_EQ(iterate_end_by_replayable_point, false);
// 在迭代一次,结果一样
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(prev_next_success_scn, iterator.iterator_impl_.prev_entry_scn_);
// 验证replayable_point_scn的值为prev_next_success_scn和第二条append的日志之间,
// 预期next_log_min_scn为replayable_point_scn+1
// 模拟replayable_point_scn位于[prev_entry_, curr_entry_]
replayable_point_scn = SCN::minus(logts_append[1], 4);
PALF_LOG(INFO, "runlin trace 7.3", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[1]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
// 由于replayable_point_scn到curr_entry_之间没有日志,因此prev_entry_scn_会被推到replayable_point_scn
EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_);
// 在迭代一次
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn, SCN::plus(replayable_point_scn, 1));
// 由于replayable_point_scn到curr_entry_之间没有日志,因此prev_entry_scn_会被推到replayable_point_scn
EXPECT_EQ(replayable_point_scn, iterator.iterator_impl_.prev_entry_scn_);
// 验证迭代append日志成功,
end_lsn_v = lsns_append[2]+2;
replayable_point_scn = logts_append[0];
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, next_log_min_scn.is_valid());
EXPECT_EQ(logts_append[1], iterator.iterator_impl_.prev_entry_scn_); count_append--;
prev_next_success_scn = logts_append[1];
// replayable_point_scn比较大,预期next_log_min_scn为logts_append[2]
replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000);
PALF_LOG(INFO, "runlin trace 7.4", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[2]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn, logts_append[2]);
// 在迭代一次,结果一样
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn, logts_append[2]);
EXPECT_EQ(iterate_end_by_replayable_point, false);
// 回退replayable_point_scn,预期next_log_min_scn为prev_next_success_scn+1
replayable_point_scn.convert_for_tx(100);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
end_lsn_v = LSN(1000000000);
replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000);
// 留一条日志
while (count_append > 1) {
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, next_log_min_scn.is_valid());
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
count_append--;
}
// 验证append切回raw后是否正常工作
{
int64_t id3 = ATOMIC_AAF(&palf_id_, 1);
std::vector<SCN> logts_append;
std::vector<LSN> lsns_append;
int count_append = 5;
PALF_LOG(INFO, "runlin trace 8.1.0", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(),
"new_leader_lsn", leader.palf_handle_impl_->get_max_lsn());
EXPECT_EQ(OB_SUCCESS, submit_log_private(leader, count_append, id, log_size, lsns_append, logts_append));
SCN max_scn_case4 = leader.palf_handle_impl_->get_max_scn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
PALF_LOG(INFO, "runlin trace 8.1", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(),
"new_leader_lsn", leader.palf_handle_impl_->get_max_lsn());
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader,
raw_write_leader,
raw_write_leader.palf_handle_impl_->get_max_lsn()));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(raw_write_leader.palf_handle_impl_->get_max_lsn(), raw_write_leader));
PALF_LOG(INFO, "runlin trace 8.2", "raw_write_leader_lsn", raw_write_leader.palf_handle_impl_->get_max_lsn(),
"new_leader_lsn", leader.palf_handle_impl_->get_max_lsn());
// replayable_point_scn偏小
SCN replayable_point_scn;
replayable_point_scn.convert_for_tx(100);
PALF_LOG(INFO, "runlin trace 8.3", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
// 迭代前一轮的日志,不需要递减count_append
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
// 由于受控回放点不可读, next_log_min_scn应该为prev_next_success_scn+1
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
PALF_LOG(INFO, "runlin trace 8.3.1", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
// 推大受控回放点到第一条日志,但end_lsn_v也变为第一条日志的起点,此时会由于end_lsn_v不可读
// 预期next_min_scn为replayable_point_scn.
// 由于这条日志在此前的next中,不需要受控回放,会推大curr_read_pos_到LogEntry头,再次next不需要读数据直接返回OB_ITER_END
end_lsn_v = lsns_append[0]+10;
replayable_point_scn = logts_append[0];
PALF_LOG(INFO, "runlin trace 8.4", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(replayable_point_scn, next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
PALF_LOG(INFO, "runlin trace 8.4.1", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(replayable_point_scn, next_log_min_scn);
EXPECT_EQ(iterate_end_by_replayable_point, false);
PALF_LOG(INFO, "runlin trace 8.4.2", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[0]), K(prev_next_success_scn));
// 模拟prev_entry_后没有日志,replayable_point_scn小于prev_entry_scn_, 后续日志都需要受控回放
// replayable_point_scn回退是不会出现的事,此时next_min_scn会返回prev_entry_scn_+1
replayable_point_scn = SCN::minus(prev_next_success_scn, 100);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
// 模拟prev_entry后有日志
// 推大end_lsn_v到第二条日志的起点
end_lsn_v = lsns_append[1]+2;
replayable_point_scn = logts_append[1];
PALF_LOG(INFO, "runlin trace 8.5", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[1]), K(prev_next_success_scn));
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn.is_valid(), false);
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
EXPECT_EQ(prev_next_success_scn, logts_append[0]);
PALF_LOG(INFO, "runlin trace 8.6", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[1]), K(prev_next_success_scn));
// 模拟prev_entry_后有日志, 但不可见的情况
// 此时会由于replayable_point_scn不吐出第二条日志
// 模拟replayable_point_scn在prev_entry_之前的情况
replayable_point_scn.convert_for_tx(100);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
// 模拟replayable_point_scn在prev_entry_之后的情况, 由于prev_enty_后有日志,因此
// prev_entry_到replayable_point_scn之间不可能有未读过的日志,
// 因此next_log_min_scn为replayable_point_scn + 1.
replayable_point_scn = SCN::plus(prev_next_success_scn , 2);
PALF_LOG(INFO, "runlin trace 8.7", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[1]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(replayable_point_scn, 1), next_log_min_scn);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(replayable_point_scn, 1), next_log_min_scn);
// 模拟replayable_point_scn在curr_entry之后的情况
replayable_point_scn.convert_from_ts(ObTimeUtility::current_time() + 100000000);
PALF_LOG(INFO, "runlin trace 8.8", K(iterator), K(replayable_point_scn), K(end_lsn_v),
K(logts_append[1]), K(prev_next_success_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(logts_append[1], next_log_min_scn);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(logts_append[1], next_log_min_scn);
end_lsn_v = LSN(1000000000);
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_log_min_scn.is_valid(), false);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, logts_append[1]);
prev_next_success_scn = iterator.iterator_impl_.prev_entry_scn_;
// 验证受控回放
replayable_point_scn.convert_for_tx(100);
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_log_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(SCN::plus(prev_next_success_scn, 1), next_log_min_scn);
EXPECT_EQ(true, iterate_end_by_replayable_point);
}
}
}
// 验证重启
restart_paxos_groups();
{
PalfHandleImplGuard raw_write_leader;
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, get_leader(id, raw_write_leader, leader_idx));
PalfHandleImpl *raw_write_palf_handle_impl = raw_write_leader.palf_handle_impl_;
auto get_file_end_lsn = []() -> LSN { return LSN(1000000000); };
auto get_mode_version = [&mode_version, &mode_version_v]() -> int64_t {
PALF_LOG(INFO, "runlin trace", K(*mode_version), K(mode_version_v));
return *mode_version;
};
EXPECT_EQ(OB_SUCCESS,
iterator.init(LSN(0), get_file_end_lsn, get_mode_version, &raw_write_palf_handle_impl->log_engine_.log_storage_));
SCN max_scn = raw_write_leader.palf_handle_impl_->get_max_scn();
int64_t count = 5 + 5 + 5 + 5 + 5;
while (count > 0) {
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn));
count--;
}
EXPECT_EQ(OB_ITER_END, iterator.next(max_scn));
EXPECT_EQ(OB_ITER_END, read_log_from_memory(raw_write_leader));
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_gc_block)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_gc_block");
OB_LOGGER.set_log_level("TRACE");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
LSN upper_aligned_log_tail;
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogEngine *log_engine = &leader.palf_handle_impl_->log_engine_;
LogStorage *log_meta_storage = &log_engine->log_meta_storage_;
block_id_t min_block_id;
share::SCN min_block_scn;
EXPECT_EQ(OB_ENTRY_NOT_EXIST, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 31, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
block_id_t expect_block_id = 1;
share::SCN expect_scn;
EXPECT_EQ(OB_SUCCESS, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn));
EXPECT_EQ(OB_SUCCESS, log_engine->get_block_min_scn(expect_block_id, expect_scn));
EXPECT_EQ(expect_scn, min_block_scn);
EXPECT_EQ(OB_SUCCESS, log_engine->delete_block(0));
EXPECT_EQ(false, log_engine->min_block_max_scn_.is_valid());
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_lsn_until_flushed(leader.palf_handle_impl_->get_max_lsn(), leader));
expect_block_id = 2;
EXPECT_EQ(OB_SUCCESS, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn));
EXPECT_EQ(OB_SUCCESS, log_engine->get_block_min_scn(expect_block_id, expect_scn));
EXPECT_EQ(expect_scn, min_block_scn);
EXPECT_EQ(OB_SUCCESS, log_engine->delete_block(1));
expect_block_id = 3;
EXPECT_EQ(OB_SUCCESS, log_engine->get_min_block_info_for_gc(min_block_id, min_block_scn));
EXPECT_EQ(OB_SUCCESS, log_engine->get_block_min_scn(expect_block_id, expect_scn));
EXPECT_EQ(expect_scn, min_block_scn);
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_iterator_with_flashback)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_iterator_with_flashback");
OB_LOGGER.set_log_level("TRACE");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
PalfHandleImplGuard raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_;
const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200));
SCN max_scn1 = leader.palf_handle_impl_->get_max_scn();
LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, 200));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
SCN max_scn2 = leader.palf_handle_impl_->get_max_scn();
LSN end_pos_of_log2 = leader.palf_handle_impl_->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
// 提交几条日志到raw_write leader
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator));
// 迭代flashback之前的日志成功
SCN next_min_scn;
SCN tmp_scn; tmp_scn.val_ = 1000;
bool iterate_end_by_replayable_point = false;
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn1, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn1);
EXPECT_EQ(OB_ITER_END, iterator.next(
max_scn1, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_));
EXPECT_EQ(SCN::plus(max_scn1, 1), next_min_scn);
PALF_LOG(INFO, "runlin trace case1", K(iterator), K(end_pos_of_log1));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.palf_handle_impl_->inner_flashback(max_scn2));
EXPECT_EQ(max_scn2, raw_write_leader.palf_handle_impl_->get_max_scn());
int64_t mode_version;
switch_flashback_to_append(raw_write_leader, mode_version);
// 磁盘上存在三条日志,一条日志已经迭代,另外一条日志没有迭代(raw_write),最后一条日志为Append
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 1, leader_idx, 333));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
EXPECT_EQ(OB_ITER_END, read_group_log(raw_write_leader, LSN(0)));
SCN max_scn3 = raw_write_leader.palf_handle_impl_->get_max_scn();
PALF_LOG(INFO, "runlin trace case2", K(iterator), K(max_scn3), "end_lsn:", raw_write_leader.palf_handle_impl_->get_end_lsn());
LSN iterator_end_lsn = iterator.iterator_storage_.end_lsn_;
// iterator内存中有几条日志,预期返回成功, 此时会清cache, 前一条日志的信息会被清除(raw_write日志)
// 迭代器游标预期依旧指向第一条日志的终点, 由于受控回放,返回iterate_end
EXPECT_EQ(OB_ITER_END, iterator.next(
max_scn1, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(end_pos_of_log1, iterator.iterator_impl_.log_storage_->get_lsn(iterator.iterator_impl_.curr_read_pos_));
EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_);
// 需要从磁盘上将后面两日志读上来,但由于受控回放不会吐出
// EXPECT_FALSE(iterator_end_lsn == iterator.iterator_storage_.end_lsn_);
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn2, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(true, iterator.iterator_impl_.curr_entry_is_raw_write_);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn2);
EXPECT_EQ(OB_SUCCESS, iterator.next(max_scn3, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_EQ(false, iterator.iterator_impl_.curr_entry_is_raw_write_);
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, max_scn3);
// raw_write_leader已经有三条日志, raw_write(1 log entry), raw_write(1), append(1),
// 模拟一条group entry 中有多条小日志
LSN last_lsn = raw_write_leader.palf_handle_impl_->get_max_lsn();
SCN last_scn = raw_write_leader.palf_handle_impl_->get_max_scn();
LogIOWorker *io_worker = raw_write_leader.palf_handle_impl_->log_engine_.log_io_worker_;
IOTaskCond cond(id_raw_write, raw_write_leader.palf_env_impl_->last_palf_epoch_);
io_worker->submit_io_task(&cond);
std::vector<LSN> lsns;
std::vector<SCN> scns;
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 10, 100, id_raw_write, lsns, scns));
int group_entry_num = 1;
int first_log_entry_index = 0, last_log_entry_index = 0;
for (int i = 1; i < 10; i++) {
if (lsns[i-1]+100+sizeof(LogEntryHeader) == lsns[i]) {
last_log_entry_index = i;
} else {
first_log_entry_index = i;
group_entry_num++;
PALF_LOG(INFO, "group entry", K(i-1));
}
if (first_log_entry_index - last_log_entry_index > 2) {
break;
}
}
leader.reset();
if (first_log_entry_index != 1 && last_log_entry_index != 9) {
PALF_LOG(INFO, "no group log has more than 2 log entry", K(first_log_entry_index), K(last_log_entry_index));
return;
}
cond.cond_.signal();
// 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前还有几条LogEntry
// LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志)
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
{
const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard new_raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader));
PalfBufferIterator buff_iterator;
PalfGroupBufferIterator group_iterator;
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator));
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator));
SCN replayable_point_scn(SCN::min_scn());
// 验证replayable_point_scn为min_scn
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// replayable_point_scn为第一条日志-1
replayable_point_scn = SCN::minus(max_scn1, 1);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// replayable_point_scn为第一条日志
replayable_point_scn = max_scn1;
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, false);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, false);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// replayable_point_scn为第一条日志 + 1
replayable_point_scn = SCN::plus(max_scn1, 1);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// 成功迭代第二条日志,第三条日志
replayable_point_scn = last_scn;
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
// 第四条日志一定是LogGroupEntry
replayable_point_scn = scns[0];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// 迭代第五条LogGroupEntry的第一条LogEntry
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// 成功迭代第五条LogGroupEntry的第一条LogEntry
replayable_point_scn = scns[1];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
// 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn
EXPECT_EQ(next_min_scn, replayable_point_scn);
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// buff_iterator的游标到了第五条group_entry的第一条小日志
// grou_iterator的游标到了第五条group_entry开头
// sncs[0] 第四条group entry,scns[1] - scns[9]是第二条
// 第五条group entry的第五条小日志被flashback
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[4]));
EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[4]);
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader));
// 提交一条group_entry
// 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append
// 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append
EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn()));
// 对于buff_iterator
// lsns[2]为第二条小日志开头,即第一条小日志末尾
// 验证游标起始位置为第一条小日志头部
// next 返回iterate是否清空cache
// 迭代raw_write写入的小日志
// 迭代append写入的小日志
PALF_LOG(INFO, "rulin trace 1", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator));
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1));
EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_);
// 迭代第二条日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代第三条日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代第四条日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代第五条日志(迭代新的GroupENtry, 非受控回放)
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn()));
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn()));
// 对于group_iterator
// 验证游标起始位置为raw_write日志开头
// next 返回iterate是否清空cache
// 迭代raw_write写入的大日志
// 迭代append写入的大日志
PALF_LOG(INFO, "rulin trace 2", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator));
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1));
// 迭代raw_write日志
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn()));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn()));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::max_scn()));
}
// 验证从一条包含多条LogEntry中日志中flashback,iterator迭代到中间的LogEntry后,flashback位点前没有LogEntry
// LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志)
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.palf_handle_impl_->get_max_lsn()));
{
const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard new_raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader));
PalfBufferIterator buff_iterator;
PalfGroupBufferIterator group_iterator;
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), buff_iterator));
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_group_buffer_iterator(LSN(0), group_iterator));
// 成功迭代第一条日志,第二条日志,第三条日志
SCN replayable_point_scn(last_scn);
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
// 第四条日志一定是LogGroupEntry
replayable_point_scn = scns[0];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(OB_SUCCESS, group_iterator.next(replayable_point_scn));
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
// 迭代第五条LogGroupEntry的第一条LogEntry
EXPECT_EQ(OB_ITER_END, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(replayable_point_scn, 1));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被受控回放,buff_iterator以及group_iterator都没有推进curr_read_pos_
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// 成功迭代第五条LogGroupEntry的第一条LogEntry
replayable_point_scn = scns[1];
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(buff_iterator.iterator_impl_.prev_entry_scn_, replayable_point_scn);
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// group iterator被受控回放, 但由于第五条日志的max_scn大于受控回放点,故受控回放
EXPECT_EQ(OB_ITER_END, group_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
// 由于受控回放的group entry对应的min scn和replayable_point_scn一样,因此next_min_scn会被设置为replayable_point_scn
EXPECT_EQ(next_min_scn, replayable_point_scn);
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(group_iterator.iterator_impl_.prev_entry_scn_, scns[0]);
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_), lsns[1]);
// 由于被第一条LogEntry受控回放,group_iterator没有推进curr_read_pos_, buff_iter推进了curr_read_pos_
EXPECT_NE(group_iterator.iterator_impl_.log_storage_->get_lsn(
group_iterator.iterator_impl_.curr_read_pos_),
buff_iterator.iterator_impl_.log_storage_->get_lsn(
buff_iterator.iterator_impl_.curr_read_pos_));
// 迭代日志发现需要受控回放
EXPECT_EQ(OB_ITER_END, buff_iterator.next(scns[1], next_min_scn, iterate_end_by_replayable_point));
// buff_iterator的游标到了第五条group_entry的第一条小日志末尾
// grou_iterator的游标到了第五条group_entry开头
// sncs[0] 第四条group entry,scns[1] - scns[9]是第二条
// 第五条group entry的第二条小日志被flashback
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[2]));
EXPECT_EQ(new_raw_write_leader.palf_handle_impl_->get_max_scn(), scns[2]);
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader));
// 提交一条group_entry
// 对于buff_iterator, 存在两条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在第一条小日志末尾),一条append
// 对于group_iterator, 存在三条group_entry未读,一条raw_rwrite(包含4条小日志,游标停留在group_entry头部),一条append
EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn()));
// 对于buff_iterator
// lsns[2]为第二条小日志开头,即第一条小日志末尾
// 验证游标起始位置为第一条小日志头部
// next 返回iterate是否清空cache
// 迭代raw_write写入的小日志
// 迭代append写入的小日志
PALF_LOG(INFO, "rulin trace 3", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(buff_iterator));
EXPECT_EQ(buff_iterator.iterator_impl_.log_storage_->get_lsn(buff_iterator.iterator_impl_.curr_read_pos_), lsns[2]);
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(buff_iterator.iterator_impl_.prev_entry_scn_, 1));
EXPECT_EQ(0, buff_iterator.iterator_impl_.curr_read_pos_);
// 迭代第二条小日志
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::max_scn()));
// 迭代新写入的LogGroupEntry, 不需要受控回放
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(SCN::min_scn()));
EXPECT_EQ(OB_ITER_END, buff_iterator.next(SCN::min_scn()));
// 对于group_iterator
// 验证游标起始位置为raw_write日志开头
// next 返回iterate是否清空cache
// 迭代raw_write写入的大日志
// 迭代append写入的大日志
PALF_LOG(INFO, "rulin trace 4", K(lsns[2]), K(lsns[1]), K(lsns[0]), K(group_iterator));
EXPECT_EQ(group_iterator.iterator_impl_.log_storage_->get_lsn(group_iterator.iterator_impl_.curr_read_pos_), lsns[1] - sizeof(LogGroupEntryHeader));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, SCN::plus(group_iterator.iterator_impl_.prev_entry_scn_, 1));
// 迭代raw_write日志
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::max_scn()));
// 迭代新的GruopEntry
EXPECT_EQ(OB_SUCCESS, group_iterator.next(SCN::min_scn()));
EXPECT_EQ(OB_ITER_END, group_iterator.next(SCN::min_scn()));
}
// 验证一条LogGroupEntry需要受控回放,buff iterator不能更新accumlate_checksum和curr_read_pos_
// LogGroup LogGroup LogGroup LogGroup LogGroup(9条小日志)
// last_scn scns[0] scns[1]...
{
const int64_t id_new_raw_write = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard new_raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_new_raw_write, leader_idx, new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(new_raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(raw_write_leader, new_raw_write_leader));
PalfBufferIterator iterator;
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->alloc_palf_buffer_iterator(LSN(0), iterator));
SCN replayable_point_scn(last_scn);
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(iterate_end_by_replayable_point, true);
EXPECT_EQ(next_min_scn, SCN::plus(last_scn, 1));
replayable_point_scn = scns[0];
EXPECT_EQ(OB_SUCCESS, iterator.next(replayable_point_scn));
// scns[1]对应的日志无法吐出
EXPECT_EQ(OB_ITER_END, iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(next_min_scn, SCN::plus(scns[0], 1));
EXPECT_EQ(iterator.iterator_impl_.prev_entry_scn_, scns[0]);
// flashback到scns[0]
EXPECT_EQ(OB_SUCCESS, new_raw_write_leader.palf_handle_impl_->inner_flashback(scns[0]));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_append(new_raw_write_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(new_raw_write_leader, 1, leader_idx, 100));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(new_raw_write_leader, new_raw_write_leader.palf_handle_impl_->get_max_lsn()));
// scns[0]对应的日志为raw write, 被flashback了, iterator停在scns[0]的末尾
// 迭代新写入的日志成功
EXPECT_EQ(OB_SUCCESS, iterator.next(SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(OB_ITER_END, iterator.next(SCN::min_scn()));
}
// 验证一条padding LogGroupEntry需要受控回放
{
const int64_t append_id = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard append_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(append_id, leader_idx, append_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 31, leader_idx, log_entry_size));
const LSN padding_start_lsn = append_leader.get_palf_handle_impl()->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, submit_log(append_leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(append_leader, append_leader.get_palf_handle_impl()->get_max_lsn()));
SCN padding_scn = append_leader.get_palf_handle_impl()->get_max_scn();
padding_scn = padding_scn.minus(padding_scn, 1);
const int64_t raw_write_id = ATOMIC_AAF(&palf_id_, 1);
PalfHandleImplGuard raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(raw_write_id, leader_idx, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(append_leader, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(raw_write_leader, raw_write_leader.get_palf_handle_impl()->get_max_lsn()));
switch_append_to_flashback(raw_write_leader, mode_version);
PalfBufferIterator buff_iterator;
PalfGroupBufferIterator group_buff_iterator;
PalfBufferIterator buff_iterator_padding_start;
PalfGroupBufferIterator group_buff_iterator_padding_start;
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_buffer_iterator(LSN(0), buff_iterator_padding_start));
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->alloc_palf_group_buffer_iterator(LSN(0), group_buff_iterator_padding_start));
SCN next_min_scn;
bool iterate_end_by_replayable_point = false;
EXPECT_EQ(OB_ITER_END, buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(OB_ITER_END, group_buff_iterator.next(share::SCN::min_scn(), next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(true, iterate_end_by_replayable_point);
// 一共有33条日志,包括padding
SCN replayable_point_scn = padding_scn.minus(padding_scn, 1);
// 直到padding日志受控回放
int ret = OB_SUCCESS;
while (OB_SUCC(buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
ret = OB_SUCCESS;
while (OB_SUCC(buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
EXPECT_EQ(OB_ITER_END, ret);
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, padding_scn);
ret = OB_SUCCESS;
while (OB_SUCC(group_buff_iterator.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
ret = OB_SUCCESS;
while (OB_SUCC(group_buff_iterator_padding_start.next(replayable_point_scn, next_min_scn, iterate_end_by_replayable_point))) {
}
EXPECT_EQ(OB_ITER_END, ret);
EXPECT_EQ(true, iterate_end_by_replayable_point);
EXPECT_EQ(next_min_scn, padding_scn);
EXPECT_EQ(false, buff_iterator.iterator_impl_.curr_entry_is_padding_);
EXPECT_EQ(false, group_buff_iterator.iterator_impl_.curr_entry_is_padding_);
// flashback到padding日志尾
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn, timeout_ts_us));
EXPECT_EQ(OB_SUCCESS, buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
LogEntry padding_log_entry;
LSN padding_log_lsn;
EXPECT_EQ(OB_SUCCESS, buff_iterator.get_entry(padding_log_entry, padding_log_lsn));
EXPECT_EQ(true, padding_log_entry.check_integrity());
EXPECT_EQ(true, padding_log_entry.header_.is_padding_log_());
EXPECT_EQ(padding_scn, padding_log_entry.header_.scn_);
EXPECT_EQ(false, buff_iterator.iterator_impl_.padding_entry_scn_.is_valid());
EXPECT_EQ(OB_SUCCESS, group_buff_iterator.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
LogGroupEntry padding_group_entry;
LSN padding_group_lsn;
EXPECT_EQ(OB_SUCCESS, group_buff_iterator.get_entry(padding_group_entry, padding_group_lsn));
EXPECT_EQ(true, padding_group_entry.check_integrity());
EXPECT_EQ(true, padding_group_entry.header_.is_padding_log());
// 对于LogGruopEntry的iterator,在construct_padding_log_entry_后,不会重置padding状态
EXPECT_EQ(true, group_buff_iterator.iterator_impl_.padding_entry_scn_.is_valid());
EXPECT_EQ(padding_log_entry.header_.scn_, padding_group_entry.header_.max_scn_);
// flashback到padding日志头
EXPECT_EQ(OB_SUCCESS, raw_write_leader.get_palf_handle_impl()->flashback(mode_version, padding_scn.minus(padding_scn, 1), timeout_ts_us));
// 预期是由于文件长度导致的OB_ITER_END
EXPECT_EQ(OB_ITER_END, buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_GE(next_min_scn, buff_iterator_padding_start.iterator_impl_.prev_entry_scn_);
EXPECT_EQ(OB_ITER_END, group_buff_iterator_padding_start.next(padding_scn, next_min_scn, iterate_end_by_replayable_point));
EXPECT_EQ(false, iterate_end_by_replayable_point);
EXPECT_GE(next_min_scn, group_buff_iterator_padding_start.iterator_impl_.prev_entry_scn_);
switch_flashback_to_append(raw_write_leader, mode_version);
EXPECT_EQ(OB_SUCCESS, submit_log(raw_write_leader, 100, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, buff_iterator_padding_start.next());
EXPECT_EQ(OB_SUCCESS, group_buff_iterator_padding_start.next());
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_raw_read)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_raw_read");
OB_LOGGER.set_log_level("TRACE");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
const int64_t read_buf_ptr_len = PALF_BLOCK_SIZE;
char *read_buf_ptr = reinterpret_cast<char*>(mtl_malloc_align(
LOG_DIO_ALIGN_SIZE, PALF_BLOCK_SIZE + 2 * LOG_DIO_ALIGN_SIZE, "mittest"));
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
PalfOptions opts;
PalfEnvImpl *palf_env_impl = dynamic_cast<palf::PalfEnvImpl*>(get_cluster()[0]->get_palf_env());
ASSERT_NE(nullptr, palf_env_impl);
palf_env_impl->get_options(opts);
opts.enable_log_cache_ = true;
palf_env_impl->update_options(opts);
// 提交100条日志, 每条日志大小为30K.
{
char *read_buf = read_buf_ptr;
int64_t nbytes = read_buf_ptr_len;
int64_t out_read_size = 0;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, 1000));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
const int64_t curr_real_size = leader.palf_handle_impl_->get_max_lsn() - LSN(PALF_INITIAL_LSN_VAL);
const LSN invalid_lsn(1);
char *invalid_read_buf = read_buf_ptr + 1;
const int64_t invalid_nbytes = 1;
// 非DIO对齐度
palf::LogIOContext io_ctx(palf::LogIOUser::META_INFO);
EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read(
invalid_lsn, invalid_read_buf, invalid_nbytes, out_read_size, io_ctx));
EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read(
LSN(PALF_INITIAL_LSN_VAL), invalid_read_buf, invalid_nbytes, out_read_size, io_ctx));
EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read(
invalid_lsn, read_buf, invalid_nbytes, out_read_size, io_ctx));
EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read(
invalid_lsn, invalid_read_buf, nbytes, out_read_size, io_ctx));
EXPECT_EQ(OB_INVALID_ARGUMENT, leader.palf_handle_impl_->raw_read(
LSN(PALF_INITIAL_LSN_VAL), read_buf, invalid_nbytes, out_read_size, io_ctx));
PALF_LOG(INFO, "raw read success");
// 读取成功
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->raw_read(LSN(PALF_INITIAL_LSN_VAL), read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx));
EXPECT_LE(out_read_size, PALF_BLOCK_SIZE);
EXPECT_EQ(out_read_size, curr_real_size);
// 读取长度超过end_lsn
PALF_LOG(INFO, "raw read return OB_ERR_OUT_OF_UPPER_BOUND");
LSN out_of_upper_bound(PALF_BLOCK_SIZE);
EXPECT_EQ(OB_ERR_OUT_OF_UPPER_BOUND, leader.palf_handle_impl_->raw_read(
out_of_upper_bound, read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx));
// 模拟生成2个文件
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 40, leader_idx, MAX_LOG_BODY_SIZE));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
// 模拟跨文件读
PALF_LOG(INFO, "raw read cross file");
LSN curr_read_lsn(lower_align(PALF_BLOCK_SIZE/2, LOG_DIO_ALIGN_SIZE));
int64_t expected_read_size = LSN(PALF_BLOCK_SIZE) - curr_read_lsn;
io_ctx.iterator_info_.allow_filling_cache_ = false;
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->raw_read(
curr_read_lsn, read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx));
EXPECT_EQ(out_read_size, expected_read_size);
//io_ctx.set_allow_filling_cache(true);
//EXPECT_EQ(OB_BUF_NOT_ENOUGH, leader.palf_handle_impl_->raw_read(
// curr_read_lsn, read_buf, expected_read_size, out_read_size, io_ctx));
EXPECT_EQ(OB_SUCCESS, leader.palf_handle_impl_->delete_block(0));
// 模拟lower_bound
PALF_LOG(INFO, "raw read return OB_ERR_OUT_OF_LOWER_BOUND");
LSN out_of_lower_bound(PALF_INITIAL_LSN_VAL);
EXPECT_EQ(OB_ERR_OUT_OF_LOWER_BOUND, leader.palf_handle_impl_->raw_read(out_of_lower_bound, read_buf, PALF_BLOCK_SIZE, out_read_size, io_ctx));
if (NULL != read_buf_ptr) {
mtl_free_align(read_buf_ptr);
}
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_iow_memleak)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_iow");
OB_LOGGER.set_log_level("INFO");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
int64_t leader_idx = 0;
// case1: palf epoch has been changed during do_task
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
IOTaskCond cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, log_entry_size));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 1");
while (iow->queue_.size() > 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
}
delete_paxos_group(id);
// case2: palf epoch has been changed during after_consume
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(1);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, log_entry_size));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
consume_cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 2");
IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify));
while (verify.count_ == 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
}
delete_paxos_group(id);
// case3: palf epoch has been changed during do_task when there is no io reduce
{
PalfHandleImplGuard leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
LogIOWorker *iow = leader.get_palf_handle_impl()->log_engine_.log_io_worker_;
IPalfEnvImpl *palf_env_impl = leader.get_palf_handle_impl()->palf_env_impl_;
bool need_stop = false;
std::thread throttling_th([palf_env_impl, &need_stop](){
PalfEnvImpl *impl = dynamic_cast<PalfEnvImpl*>(palf_env_impl);
while (!need_stop) {
impl->log_io_worker_wrapper_.notify_need_writing_throttling(true);
usleep(1000);
}
});
ObILogAllocator *allocator = palf_env_impl->get_log_allocator();
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 32, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->sw_.get_max_lsn()));
LSN end_lsn = leader.get_palf_handle_impl()->get_end_lsn();
// case2: palf epoch has been changed during after_consume
IOTaskConsumeCond consume_cond(id, leader.palf_env_impl_->last_palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&consume_cond));
sleep(3);
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->set_base_lsn(end_lsn));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, leader.get_palf_handle_impl()->log_engine_.submit_purge_throttling_task(PurgeThrottlingType::PURGE_BY_GET_MC_REQ));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 2, leader_idx, log_entry_size));
EXPECT_NE(0, allocator->flying_log_task_);
EXPECT_NE(0, allocator->flying_meta_task_);
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 10, leader_idx));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 1, leader_idx, log_entry_size));
leader.get_palf_handle_impl()->log_engine_.palf_epoch_++;
consume_cond.cond_.signal();
PALF_LOG(INFO, "runlin trace submit log 3");
IOTaskVerify verify(id, leader.get_palf_handle_impl()->log_engine_.palf_epoch_);
EXPECT_EQ(OB_SUCCESS, iow->submit_io_task(&verify));
while (verify.count_ == 0) {
PALF_LOG(INFO, "queue size is not zero", "size", iow->queue_.size());
sleep(1);
}
EXPECT_EQ(0, allocator->flying_log_task_);
EXPECT_EQ(0, allocator->flying_meta_task_);
need_stop = true;
throttling_th.join();
}
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_log_service_interface)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_log_service_interface");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
ObSimpleLogServer *log_server = dynamic_cast<ObSimpleLogServer*>(get_cluster()[0]);
ASSERT_NE(nullptr, log_server);
ObLogService *log_service = &log_server->log_service_;
ObTenantRole tenant_role; tenant_role.value_ = ObTenantRole::Role::PRIMARY_TENANT;
PalfBaseInfo palf_base_info; palf_base_info.generate_by_default();
ObLogHandler log_handler; ObLogRestoreHandler restore_handler;
ObLogApplyService *apply_service = &log_service->apply_service_;
ObReplicaType replica_type;
ObLSID ls_id(id);
ObApplyStatus *apply_status = nullptr;
ASSERT_NE(nullptr, apply_status = static_cast<ObApplyStatus*>(mtl_malloc(sizeof(ObApplyStatus), "mittest")));
new (apply_status) ObApplyStatus();
apply_status->inc_ref();
EXPECT_EQ(OB_SUCCESS, log_service->start());
EXPECT_EQ(OB_SUCCESS, apply_service->apply_status_map_.insert(ls_id, apply_status));
apply_service->is_running_ = true;
EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler));
bool is_exist = false;
EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist));
EXPECT_EQ(is_exist, false);
EXPECT_EQ(OB_ENTRY_NOT_EXIST, apply_service->apply_status_map_.erase(ls_id));
EXPECT_EQ(OB_SUCCESS, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler));
EXPECT_EQ(OB_ENTRY_EXIST, log_service->create_ls(ls_id, REPLICA_TYPE_FULL, tenant_role, palf_base_info, true, log_handler, restore_handler));
EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist));
EXPECT_EQ(is_exist, true);
const char *log_dir = log_service->palf_env_->palf_env_impl_.log_dir_;
bool result = false;
EXPECT_EQ(OB_SUCCESS, FileDirectoryUtils::is_empty_directory(log_dir, result));
EXPECT_EQ(false, result);
EXPECT_EQ(OB_SUCCESS, log_service->remove_ls(ls_id, log_handler, restore_handler));
EXPECT_EQ(OB_SUCCESS, log_service->check_palf_exist(ls_id, is_exist));
}
TEST_F(TestObSimpleLogClusterSingleReplica, test_raw_write_concurrent_lsn)
{
SET_CASE_LOG_FILE(TEST_NAME, "test_raw_write_concurrent_lsn");
int64_t id = ATOMIC_AAF(&palf_id_, 1);
OB_LOGGER.set_log_level("TRACE");
int64_t leader_idx = 0;
PalfHandleImplGuard leader;
PalfHandleImplGuard raw_write_leader;
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id, leader_idx, leader));
PalfHandleImpl *palf_handle_impl = leader.palf_handle_impl_;
const int64_t id_raw_write = ATOMIC_AAF(&palf_id_, 1);
EXPECT_EQ(OB_SUCCESS, create_paxos_group(id_raw_write, leader_idx, raw_write_leader));
EXPECT_EQ(OB_SUCCESS, change_access_mode_to_raw_write(raw_write_leader));
EXPECT_EQ(OB_SUCCESS, submit_log(leader, 100, leader_idx, MAX_LOG_BASE_TYPE));
SCN max_scn1 = leader.palf_handle_impl_->get_max_scn();
LSN end_pos_of_log1 = leader.palf_handle_impl_->get_max_lsn();
EXPECT_EQ(OB_SUCCESS, wait_until_has_committed(leader, leader.palf_handle_impl_->get_max_lsn()));
ObSimpleLogServer *log_server = dynamic_cast<ObSimpleLogServer*>(get_cluster()[leader_idx]);
ASSERT_NE(nullptr, log_server);
std::thread submit_log_t1([&]() {
ObTenantEnv::set_tenant(log_server->get_tenant_base());
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
});
std::thread submit_log_t2([&]() {
ObTenantEnv::set_tenant(log_server->get_tenant_base());
EXPECT_EQ(OB_ITER_END, read_and_submit_group_log(leader, raw_write_leader));
});
submit_log_t1.join();
submit_log_t2.join();
}
} // namespace unittest
} // namespace oceanbase
int main(int argc, char **argv)
{
RUN_SIMPLE_LOG_CLUSTER_TEST(TEST_NAME);
}