170 lines
6.5 KiB
C++
170 lines
6.5 KiB
C++
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#include <gtest/gtest.h>
|
|
#include <thread>
|
|
#define private public
|
|
#define protected public
|
|
#include "storage/tx/ob_trans_define.h"
|
|
#include "storage/tx/ob_trans_part_ctx.h"
|
|
#include "storage/tx/ob_trans_service.h"
|
|
#define USING_LOG_PREFIX TRANS
|
|
#include "../mock_utils/async_util.h"
|
|
#include "test_tx_dsl.h"
|
|
#include "tx_node.h"
|
|
namespace oceanbase
|
|
{
|
|
using namespace ::testing;
|
|
using namespace transaction;
|
|
using namespace share;
|
|
|
|
namespace concurrent_control
|
|
{
|
|
int check_sequence_set_violation(const concurrent_control::ObWriteFlag,
|
|
const int64_t,
|
|
const ObTransID,
|
|
const blocksstable::ObDmlFlag,
|
|
const int64_t,
|
|
const ObTransID,
|
|
const blocksstable::ObDmlFlag,
|
|
const int64_t)
|
|
{
|
|
return OB_SUCCESS;
|
|
}
|
|
} // namespace concurrent_control
|
|
|
|
class ObTestTxCtx : public ::testing::Test
|
|
{
|
|
public:
|
|
virtual void SetUp() override
|
|
{
|
|
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001);
|
|
const uint64_t tv = ObTimeUtility::current_time();
|
|
ObCurTraceId::set(&tv);
|
|
GCONF._ob_trans_rpc_timeout = 500;
|
|
ObClockGenerator::init();
|
|
const testing::TestInfo *const test_info =
|
|
testing::UnitTest::GetInstance()->current_test_info();
|
|
auto test_name = test_info->name();
|
|
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
|
|
}
|
|
virtual void TearDown() override
|
|
{
|
|
const testing::TestInfo *const test_info =
|
|
testing::UnitTest::GetInstance()->current_test_info();
|
|
auto test_name = test_info->name();
|
|
_TRANS_LOG(INFO, ">>>> tearDown test : %s", test_name);
|
|
ObClockGenerator::destroy();
|
|
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
|
|
}
|
|
MsgBus bus_;
|
|
};
|
|
|
|
TEST_F(ObTestTxCtx, DelayAbort)
|
|
{
|
|
ObTxPartList backup_parts;
|
|
START_ONE_TX_NODE(n1);
|
|
PREPARE_TX_PARAM(tx_param);
|
|
PREPARE_TX(n1, tx);
|
|
{ // prepare snapshot for write
|
|
ObTxReadSnapshot snapshot;
|
|
ASSERT_EQ(OB_SUCCESS,
|
|
n1->get_read_snapshot(tx, tx_param.isolation_, n1->ts_after_ms(100), snapshot));
|
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp1);
|
|
ASSERT_EQ(OB_SUCCESS, n1->create_implicit_savepoint(tx, tx_param, sp1));
|
|
ASSERT_EQ(OB_SUCCESS, n1->write(tx, snapshot, 100, 112));
|
|
}
|
|
ASSERT_EQ(OB_SUCCESS, backup_parts.assign(tx.parts_));
|
|
tx.parts_.reset();
|
|
|
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = nullptr;
|
|
ObPartTransCtx *tx_ctx = nullptr;
|
|
ASSERT_EQ(OB_SUCCESS, n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr));
|
|
|
|
{
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->get_tx_ctx(tx.tx_id_, false /*for_replay*/, tx_ctx));
|
|
GCONF._private_buffer_size = 1;
|
|
// ASSERT_EQ(OB_SUCCESS, tx_ctx->submit_log_impl_(ObTxLogType::TX_REDO_LOG));
|
|
ASSERT_EQ(OB_SUCCESS, tx_ctx->submit_redo_log(false /*is_freeze*/));
|
|
TRANS_LOG(INFO, "[TEST] after submit redo", K(tx_ctx->trans_id_),
|
|
K(tx_ctx->exec_info_.max_applied_log_ts_));
|
|
n1->wait_all_redolog_applied();
|
|
TRANS_LOG(INFO, "[TEST] after on_success redo", K(tx_ctx->trans_id_),
|
|
K(tx_ctx->exec_info_.max_applied_log_ts_));
|
|
ASSERT_EQ(true, tx_ctx->exec_info_.redo_lsns_.count() > 0);
|
|
ASSERT_EQ(true, tx_ctx->exec_info_.max_applied_log_ts_.is_valid());
|
|
ASSERT_EQ(ObTxState::INIT, tx_ctx->exec_info_.state_);
|
|
ASSERT_EQ(false, tx_ctx->is_follower_());
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->revert_tx_ctx(tx_ctx));
|
|
}
|
|
|
|
ls_tx_ctx_mgr->switch_to_follower_forcedly();
|
|
|
|
{
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->get_tx_ctx(tx.tx_id_, true /*for_replay*/, tx_ctx));
|
|
ASSERT_EQ(true, tx_ctx->exec_info_.redo_lsns_.count() > 0);
|
|
ASSERT_EQ(true, tx_ctx->exec_info_.max_applied_log_ts_.is_valid());
|
|
ASSERT_EQ(ObTxState::INIT, tx_ctx->exec_info_.state_);
|
|
ASSERT_EQ(true, tx_ctx->is_follower_());
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->revert_tx_ctx(tx_ctx));
|
|
}
|
|
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->switch_to_leader());
|
|
n1->wait_all_redolog_applied();
|
|
|
|
{
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->get_tx_ctx(tx.tx_id_, false /*for_replay*/, tx_ctx));
|
|
TRANS_LOG(INFO, "[TEST] after leader takeover", K(tx_ctx->trans_id_),
|
|
K(tx_ctx->exec_info_.state_), K(tx_ctx->exec_info_.max_applied_log_ts_));
|
|
ASSERT_EQ(true, tx_ctx->exec_info_.redo_lsns_.count() > 0);
|
|
ASSERT_EQ(true, tx_ctx->exec_info_.max_applied_log_ts_.is_valid());
|
|
ASSERT_EQ(true, tx_ctx->sub_state_.is_force_abort());
|
|
ASSERT_EQ(false, tx_ctx->sub_state_.is_state_log_submitted());
|
|
ASSERT_EQ(ObTxState::INIT, tx_ctx->exec_info_.state_);
|
|
ASSERT_EQ(false, tx_ctx->is_follower_());
|
|
ASSERT_EQ(OB_SUCCESS, ls_tx_ctx_mgr->revert_tx_ctx(tx_ctx));
|
|
}
|
|
|
|
{ // prepare snapshot for write
|
|
ObTxReadSnapshot snapshot;
|
|
ASSERT_EQ(OB_SUCCESS,
|
|
n1->get_read_snapshot(tx, tx_param.isolation_, n1->ts_after_ms(100), snapshot));
|
|
CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp2);
|
|
ASSERT_EQ(OB_SUCCESS, n1->create_implicit_savepoint(tx, tx_param, sp2));
|
|
ASSERT_EQ(OB_TRANS_KILLED, n1->write(tx, snapshot, 1110, 1120));
|
|
}
|
|
|
|
ASSERT_EQ(OB_SUCCESS, tx.parts_.assign(backup_parts));
|
|
ASSERT_EQ(OB_TRANS_KILLED, n1->commit_tx(tx, n1->ts_after_ms(500)));
|
|
ASSERT_EQ(OB_SUCCESS, n1->release_tx(tx));
|
|
|
|
ASSERT_EQ(OB_SUCCESS, n1->wait_all_tx_ctx_is_destoryed());
|
|
ASSERT_EQ(OB_SUCCESS, n1->txs_.tx_ctx_mgr_.revert_ls_tx_ctx_mgr(ls_tx_ctx_mgr));
|
|
}
|
|
} // namespace oceanbase
|
|
|
|
int main(int argc, char **argv)
|
|
{
|
|
int64_t tx_id = 21533427;
|
|
uint64_t h = murmurhash(&tx_id, sizeof(tx_id), 0);
|
|
system("rm -rf test_tx_ctx*.log*");
|
|
ObLogger &logger = ObLogger::get_logger();
|
|
logger.set_file_name("test_tx_ctx.log", true, false,
|
|
"test_tx_ctx_rs.log", // rs
|
|
"test_tx_ctx_election.log", // election
|
|
"test_tx_ctx_audit.log"); // audit
|
|
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
|
|
::testing::InitGoogleTest(&argc, argv);
|
|
TRANS_LOG(INFO, "mmhash:", K(h));
|
|
return RUN_ALL_TESTS();
|
|
}
|