688 lines
21 KiB
C++
688 lines
21 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gmock/gmock.h>
|
|
#define private public
|
|
#define protected public
|
|
#include "storage/tx/ob_tx_redo_submitter.h"
|
|
#define USING_LOG_PREFIX TRANS
|
|
|
|
|
|
namespace oceanbase
|
|
{
|
|
using namespace ::testing;
|
|
using namespace transaction;
|
|
using namespace share;
|
|
using namespace memtable;
|
|
namespace transaction {
|
|
// Test submitter logic works fine in use cases
|
|
//
|
|
// mock part_ctx's interface:
|
|
// - prepare_for_submit_redo
|
|
// - fill_log_block
|
|
// - submit_log_block_out
|
|
// - is_parallel_logging
|
|
//
|
|
// mock memtable_ctx's interface
|
|
// - fill_redo_log
|
|
// - log_submitted
|
|
//
|
|
struct MockDelegate {
|
|
virtual bool is_parallel_logging() const = 0;
|
|
virtual int submit_redo_log_out(ObTxLogBlock &log_block,
|
|
ObTxLogCb *&log_cb,
|
|
memtable::ObRedoLogSubmitHelper &helper,
|
|
const int64_t replay_hint,
|
|
const bool has_hold_ctx_lock,
|
|
share::SCN &submitted) = 0;
|
|
virtual int fill_redo_log(memtable::ObTxFillRedoCtx &ctx) = 0;
|
|
|
|
};
|
|
struct MockImpl : MockDelegate {
|
|
int a_;
|
|
public:
|
|
MOCK_CONST_METHOD0(is_parallel_logging, bool());
|
|
MOCK_METHOD6(submit_redo_log_out, int(ObTxLogBlock &,
|
|
ObTxLogCb *&,
|
|
memtable::ObRedoLogSubmitHelper &,
|
|
const int64_t,
|
|
const bool,
|
|
share::SCN &));
|
|
MOCK_METHOD1(fill_redo_log, int(memtable::ObTxFillRedoCtx &));
|
|
};
|
|
thread_local MockImpl *mock_ptr;
|
|
class ObTestRedoSubmitter : public ::testing::Test
|
|
{
|
|
public:
|
|
virtual void SetUp() override
|
|
{
|
|
oceanbase::ObClusterVersion::get_instance().update_data_version(DATA_CURRENT_VERSION);
|
|
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001);
|
|
ObAddr ip_port(ObAddr::VER::IPV4, "119.119.0.1",2023);
|
|
ObCurTraceId::init(ip_port);
|
|
GCONF._ob_trans_rpc_timeout = 500;
|
|
ObClockGenerator::init();
|
|
const testing::TestInfo* const test_info =
|
|
testing::UnitTest::GetInstance()->current_test_info();
|
|
auto test_name = test_info->name();
|
|
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
|
|
|
|
// prepare for test
|
|
tx_ctx.exec_info_.state_ = ObTxState::INIT;
|
|
tx_ctx.exec_info_.scheduler_ = common::ObAddr(common::ObAddr::VER::IPV4, "127.0.0.1", 8888);
|
|
tx_ctx.exec_info_.next_log_entry_no_ = 0;
|
|
tx_ctx.cluster_version_ = DATA_CURRENT_VERSION;
|
|
ObLSID ls_id(1001); ObTransID tx_id(777);
|
|
EXPECT_EQ(OB_SUCCESS,tx_ctx.init_log_cbs_(ls_id, tx_id));
|
|
mock_ptr = &mdo_;
|
|
}
|
|
virtual void TearDown() override
|
|
{
|
|
const testing::TestInfo* const test_info =
|
|
testing::UnitTest::GetInstance()->current_test_info();
|
|
auto test_name = test_info->name();
|
|
_TRANS_LOG(INFO, ">>>> tearDown test : %s", test_name);
|
|
ObClockGenerator::destroy();
|
|
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
|
|
}
|
|
MockImpl mdo_;
|
|
ObPartTransCtx tx_ctx;
|
|
ObMemtableCtx mt_ctx;
|
|
};
|
|
|
|
int succ_submit_redo_log_out(ObTxLogBlock & b,
|
|
ObTxLogCb *& log_cb,
|
|
memtable::ObRedoLogSubmitHelper &h,
|
|
const int64_t replay_hint,
|
|
const bool has_hold_ctx_lock,
|
|
share::SCN &submitted_scn)
|
|
{
|
|
submitted_scn.convert_for_tx(123123123);
|
|
if (log_cb) {
|
|
((ObPartTransCtx*)(log_cb->ctx_))->return_log_cb_(log_cb);
|
|
log_cb = NULL;
|
|
}
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
bool ObPartTransCtx::is_parallel_logging() const
|
|
{
|
|
return mock_ptr->is_parallel_logging();
|
|
}
|
|
|
|
int ObPartTransCtx::submit_redo_log_out(ObTxLogBlock &log_block,
|
|
ObTxLogCb *&log_cb,
|
|
memtable::ObRedoLogSubmitHelper &helper,
|
|
const int64_t replay_hint,
|
|
const bool has_hold_ctx_lock,
|
|
share::SCN &submitted_scn)
|
|
{
|
|
return mock_ptr->submit_redo_log_out(log_block, log_cb, helper, replay_hint, has_hold_ctx_lock, submitted_scn);
|
|
}
|
|
}// transaction
|
|
|
|
namespace memtable {
|
|
int ObMemtableCtx::fill_redo_log(ObTxFillRedoCtx &ctx)
|
|
{
|
|
TRANS_LOG(INFO, "", K(mock_ptr->a_));
|
|
return mock_ptr->fill_redo_log(ctx);
|
|
}
|
|
int ObMemtableCtx::log_submitted(const memtable::ObRedoLogSubmitHelper &helper)
|
|
{
|
|
TRANS_LOG(INFO, "", K(mock_ptr->a_));
|
|
return OB_SUCCESS;
|
|
}
|
|
int ObMemtableCtx::get_log_guard(const transaction::ObTxSEQ &write_seq,
|
|
memtable::ObCallbackListLogGuard &log_guard,
|
|
int& cb_list_idx)
|
|
{
|
|
TRANS_LOG(INFO, "", K(mock_ptr->a_));
|
|
cb_list_idx = write_seq.get_branch();
|
|
return OB_SUCCESS;
|
|
}
|
|
} // memtable
|
|
namespace transaction {
|
|
TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BLOCK_FROZEN)
|
|
{
|
|
mdo_.a_ = 1;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(false));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.serial_submit(false));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_BUF_NOT_ENOUGH)
|
|
{
|
|
mdo_.a_ = 2;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(false));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BUF_NOT_ENOUGH;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_EAGAIN, submitter.serial_submit(false));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_ALL_FILLED)
|
|
{
|
|
mdo_.a_ = 2;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(false));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_SUCCESS;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(false));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_UNEXPECTED_ERROR)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(false));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_ALLOCATE_MEMORY_FAILED;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, submitter.serial_submit(false));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, serial_submit_by_writer_thread_serial_final)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(false));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_SUCCESS;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_SUCCESS, submitter.serial_submit(true/*serial final*/));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, parallel_submit_by_writer_thread_BLOCK_FROZEN)
|
|
{
|
|
mdo_.a_ = 4;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxSEQ writer_seq(101, 0);
|
|
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.parallel_submit(writer_seq));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, parallel_submit_by_writer_thread_BLOCKED_BY_OTHER_LIST)
|
|
{
|
|
mdo_.a_ = 4;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_ITER_END;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxSEQ writer_seq(101, 0);
|
|
EXPECT_EQ(OB_ITER_END, submitter.parallel_submit(writer_seq));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, parallel_submit_by_writer_thread_ALL_FILLED)
|
|
{
|
|
mdo_.a_ = 4;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_SUCCESS;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxSEQ writer_seq(101, 0);
|
|
EXPECT_EQ(OB_SUCCESS, submitter.parallel_submit(writer_seq));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, parallel_submit_by_writer_thread_UNEXPECTED_ERROR)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_ALLOCATE_MEMORY_FAILED;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxSEQ writer_seq(101, 0);
|
|
EXPECT_EQ(OB_ALLOCATE_MEMORY_FAILED, submitter.parallel_submit(writer_seq));
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BLOCK_FROZEN)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 0;
|
|
ctx.buf_pos_ = 0;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit_for_freeze());
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_FROZEN_BLOCKED_BY_OTHERS)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
// first list flushed the flushable part
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_EAGAIN;
|
|
}));
|
|
// submit it out to log service layer
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
// retry from other lists, the list with small epoch flushed hit a block frozen
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
// submit it out to log service layer
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
// retry from other list, can not submit others due to the list with min epoch is block frozen
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 0;
|
|
ctx.buf_pos_ = 0;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.submit_for_freeze());
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_BUF_NOT_ENOUGH)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
// the list with small epoch is large
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BUF_NOT_ENOUGH;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
// the list with small epoch is all flushed
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_EAGAIN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
// the second list with small epoch is all flushed
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_EAGAIN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
// all list filled, nothing to fill
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 0;
|
|
ctx.buf_pos_ = 0;
|
|
return OB_SUCCESS;
|
|
}));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_SUCCESS, submitter.submit_for_freeze());
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_freeze_parallel_logging_ALL_FILLED)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_SUCCESS;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_SUCCESS, submitter.submit_for_freeze());
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_switch_leader_or_on_commit_serial_logging)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(false));
|
|
{
|
|
InSequence s1;
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(1)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(1)
|
|
.WillOnce(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxLogBlock log_block;
|
|
ObTransID tx_id(101);
|
|
log_block.get_header().init(1, DATA_CURRENT_VERSION, 101, tx_id, ObAddr());
|
|
log_block.init_for_fill();
|
|
memtable::ObRedoLogSubmitHelper helper;
|
|
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.fill(log_block, helper));
|
|
EXPECT_EQ(0, helper.callbacks_.count());
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_switch_leader_or_on_commit_parallel_logging_ALL_FILLED)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(4)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}))
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 200;
|
|
ctx.buf_pos_ = 333;
|
|
return OB_ITER_END;
|
|
}))
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 300;
|
|
ctx.buf_pos_ = 444;
|
|
return OB_EAGAIN;
|
|
}))
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 400;
|
|
ctx.buf_pos_ = 555;
|
|
ObCallbackScope scope;
|
|
ctx.helper_->callbacks_.push_back(scope);
|
|
return OB_SUCCESS;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(3)
|
|
.WillRepeatedly(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxLogBlock log_block;
|
|
ObTransID tx_id(101);
|
|
log_block.get_header().init(1, DATA_CURRENT_VERSION, 101, tx_id, ObAddr());
|
|
log_block.init_for_fill();
|
|
memtable::ObRedoLogSubmitHelper helper;
|
|
EXPECT_EQ(OB_SUCCESS, submitter.fill(log_block, helper));
|
|
EXPECT_EQ(helper.callbacks_.count(), 1);
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_by_switch_leader_or_on_commit_parallel_logging_BLOCKED)
|
|
{
|
|
mdo_.a_ = 3;
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(4)
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 100;
|
|
ctx.buf_pos_ = 200;
|
|
return OB_BLOCK_FROZEN;
|
|
}))
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 200;
|
|
ctx.buf_pos_ = 333;
|
|
return OB_ITER_END;
|
|
}))
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 300;
|
|
ctx.buf_pos_ = 444;
|
|
return OB_EAGAIN;
|
|
}))
|
|
.WillOnce(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 0;
|
|
ctx.buf_pos_ = 0;
|
|
return OB_BLOCK_FROZEN;
|
|
}));
|
|
EXPECT_CALL(mdo_, submit_redo_log_out(_,_,_,_,_,_))
|
|
.Times(3)
|
|
.WillRepeatedly(Invoke(succ_submit_redo_log_out));
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxLogBlock log_block;
|
|
ObTransID tx_id(101);
|
|
log_block.get_header().init(1, DATA_CURRENT_VERSION, 101, tx_id, ObAddr());
|
|
log_block.init_for_fill();
|
|
memtable::ObRedoLogSubmitHelper helper;
|
|
EXPECT_EQ(OB_BLOCK_FROZEN, submitter.fill(log_block, helper));
|
|
EXPECT_EQ(helper.callbacks_.count(), 0);
|
|
}
|
|
}
|
|
|
|
TEST_F(ObTestRedoSubmitter, submit_ROW_SIZE_TOO_LARGE)
|
|
{
|
|
EXPECT_CALL(mdo_, is_parallel_logging())
|
|
.Times(AtLeast(1))
|
|
.WillRepeatedly(Return(true));
|
|
{
|
|
EXPECT_CALL(mdo_, fill_redo_log(_))
|
|
.Times(3)
|
|
.WillRepeatedly(Invoke([](ObTxFillRedoCtx &ctx) {
|
|
ctx.fill_count_ = 0;
|
|
ctx.buf_pos_ = 0;
|
|
return OB_ERR_TOO_BIG_ROWSIZE;
|
|
}));
|
|
{
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.submit_all(true));
|
|
EXPECT_EQ(submitter.get_submitted_cnt(), 0);
|
|
}
|
|
{
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxSEQ write_seq(100,200);
|
|
EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.parallel_submit(write_seq));
|
|
EXPECT_EQ(submitter.get_submitted_cnt(), 0);
|
|
}
|
|
{
|
|
ObTxRedoSubmitter submitter(tx_ctx, mt_ctx);
|
|
ObTxLogBlock log_block;
|
|
ObTransID tx_id(101);
|
|
log_block.get_header().init(1, DATA_CURRENT_VERSION, 101, tx_id, ObAddr());
|
|
log_block.init_for_fill();
|
|
memtable::ObRedoLogSubmitHelper helper;
|
|
EXPECT_EQ(OB_ERR_TOO_BIG_ROWSIZE, submitter.fill(log_block, helper));
|
|
EXPECT_EQ(submitter.get_submitted_cnt(), 0);
|
|
EXPECT_EQ(helper.callbacks_.count(), 0);
|
|
}
|
|
}
|
|
}
|
|
|
|
} // transaction
|
|
} // oceanbase
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
const char *log_name = "test_redo_submitter.log";
|
|
system("rm -rf test_redo_submitter.log*");
|
|
ObLogger &logger = ObLogger::get_logger();
|
|
logger.set_file_name(log_name, true, false,
|
|
log_name,
|
|
log_name,
|
|
log_name);
|
|
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
return RUN_ALL_TESTS();
|
|
}
|