134 lines
7.6 KiB
C
134 lines
7.6 KiB
C
/**
|
|
* Copyright (c) 2021 OceanBase
|
|
* OceanBase CE is licensed under Mulan PubL v2.
|
|
* You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
* You may obtain a copy of Mulan PubL v2 at:
|
|
* http://license.coscl.org.cn/MulanPubL-2.0
|
|
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
* See the Mulan PubL v2 for more details.
|
|
*/
|
|
|
|
#define START_ONE_TX_NODE(n1) \
|
|
auto n1 = new ObTxNode(1, ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 8888), bus_); \
|
|
NAMED_DEFER(defer_n1, delete(n1)); \
|
|
ASSERT_EQ(OB_SUCCESS, n1->start()); \
|
|
LOG_INFO("##START_ONE_TX_NODE##", "node.addr_", n1->addr_);
|
|
|
|
#define START_TWO_TX_NODE(n1, n2) \
|
|
auto n1 = new ObTxNode(1, ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 8888), bus_); \
|
|
auto n2 = new ObTxNode(2, ObAddr(ObAddr::VER::IPV4, "127.0.0.2", 8888), bus_); \
|
|
NAMED_DEFER(defer_n1, delete(n1)); \
|
|
NAMED_DEFER(defer_n2, delete(n2)); \
|
|
ASSERT_EQ(OB_SUCCESS, n1->start()); \
|
|
ASSERT_EQ(OB_SUCCESS, n2->start()); \
|
|
LOG_INFO("##START_TWO_TX_NODE##", K(n1->addr_), K(n2->addr_));
|
|
|
|
#define START_TWO_TX_NODE_WITH_LSID(n1, n2, ls_id) \
|
|
auto n1 = new ObTxNode(ls_id, ObAddr(ObAddr::VER::IPV4, "127.0.0.1", 8888), bus_); \
|
|
auto n2 = new ObTxNode(ls_id + 1, ObAddr(ObAddr::VER::IPV4, "127.0.0.2", 8888), bus_); \
|
|
NAMED_DEFER(defer_n1, delete (n1)); \
|
|
NAMED_DEFER(defer_n2, delete (n2)); \
|
|
ASSERT_EQ(OB_SUCCESS, n1->start()); \
|
|
ASSERT_EQ(OB_SUCCESS, n2->start()); \
|
|
LOG_INFO("##START_TWO_TX_NODE##", K(n1->addr_), K(n2->addr_));
|
|
|
|
#define PREPARE_TX(n1, tx) \
|
|
ObTxDescGuard guard = n1->get_tx_guard(); \
|
|
ObTxDesc &tx = guard.get_tx_desc(); \
|
|
LOG_INFO("##PREPARE_TX##", "node", n1->addr_, "tx_id", tx.tx_id_);
|
|
|
|
#define PREPARE_TX_PARAM(tx_param) \
|
|
ObTxParam tx_param; \
|
|
tx_param.timeout_us_ = 1000000; \
|
|
tx_param.access_mode_ = ObTxAccessMode::RW; \
|
|
tx_param.isolation_ = ObTxIsolationLevel::RC; \
|
|
tx_param.cluster_id_ = 100;
|
|
#define GET_READ_SNAPSHOT(n1, tx, tx_param, snapshot) \
|
|
ObTxReadSnapshot snapshot; \
|
|
ASSERT_EQ(OB_SUCCESS, n1->get_read_snapshot(tx, tx_param.isolation_, n1->ts_after_ms(100), snapshot));
|
|
|
|
#define CREATE_IMPLICIT_SAVEPOINT(n1, tx, tx_param, sp) \
|
|
ObTxSEQ sp; \
|
|
ASSERT_EQ(OB_SUCCESS, n1->create_implicit_savepoint(tx, tx_param, sp));
|
|
|
|
#define CREATE_BRANCH_SAVEPOINT(n1, tx, branch, sp) \
|
|
ObTxSEQ sp; \
|
|
ASSERT_EQ(OB_SUCCESS, n1->create_branch_savepoint(tx, branch, sp));
|
|
|
|
#define ROLLBACK_TO_IMPLICIT_SAVEPOINT(n1, tx, sp, timeout_us) \
|
|
n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_us(timeout_us), nullptr)
|
|
|
|
#define ROLLBACK_TO_IMPLICIT_SAVEPOINT_X(n1, tx, sp, timeout_us, extra_touched_ls) \
|
|
n1->rollback_to_implicit_savepoint(tx, sp, n1->ts_after_us(timeout_us), extra_touched_ls)
|
|
|
|
#define INJECT_LINK_FAILURE(n1, n2) \
|
|
ASSERT_EQ(OB_SUCCESS, bus_.inject_link_failure(n1->addr_, n2->addr_)); \
|
|
LOG_INFO("##JINECT_LINK_FAILURE##", K(n1->addr_), K(n2->addr_));
|
|
|
|
#define REPAIR_LINK_FAILURE(n1, n2) \
|
|
ASSERT_EQ(OB_SUCCESS, bus_.repair_link_failure(n1->addr_, n2->addr_)); \
|
|
LOG_INFO("##REPAIR_LINK_FAILURE##", K(n1->addr_), K(n2->addr_));
|
|
|
|
#define ASYNC_DO(op_name, op) \
|
|
auto op_name ## _fff_ = [&] { return op; }; \
|
|
auto op_name = test::make_async(op_name ## _fff_); \
|
|
op_name.wait_started(); \
|
|
ASSERT_FALSE(op_name.is_evaled()); \
|
|
LOG_INFO("##ASYNC_DO##", "op_name", #op_name);
|
|
|
|
#define ASYNC_WAIT(op_name, timeout_us, ret) \
|
|
LOG_INFO("##ASYNC_WAIT##", "op_name", #op_name); \
|
|
int ret = 0; \
|
|
do { \
|
|
int i = timeout_us / 1000; \
|
|
while(!op_name.is_evaled() && i-- > 0) { \
|
|
usleep(1000); \
|
|
} \
|
|
if (i > 0) { ret = op_name.get(); } \
|
|
} while(0); \
|
|
LOG_INFO("##ASYNC_DONE##", "op_name", #op_name);
|
|
|
|
#define ROLLBACK_TX(n1, tx) \
|
|
n1->rollback_tx(tx);
|
|
|
|
#define COMMIT_TX(n1, tx, timeout_us) \
|
|
n1->commit_tx(tx, n1->ts_after_us(timeout_us));
|
|
|
|
#define FLUSH_REDO(n1) \
|
|
do { \
|
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
|
|
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
|
|
ObTransID fail_tx_id; \
|
|
ASSERT_EQ(ls_tx_ctx_mgr->traverse_tx_to_submit_redo_log(fail_tx_id, UINT32_MAX), OB_SUCCESS); \
|
|
} while(0)
|
|
|
|
#define SWITCH_TO_FOLLOWER_FORCEDLY(n1) \
|
|
do { \
|
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
|
|
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
|
|
ASSERT_EQ(ls_tx_ctx_mgr->switch_to_follower_forcedly(), OB_SUCCESS); \
|
|
} while(0)
|
|
|
|
#define SWITCH_TO_FOLLOWER_GRACEFULLY(n1) \
|
|
do { \
|
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
|
|
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
|
|
ASSERT_EQ(ls_tx_ctx_mgr->switch_to_follower_gracefully(), OB_SUCCESS); \
|
|
} while(0)
|
|
|
|
#define SWITCH_TO_LEADER(n1) \
|
|
do { \
|
|
ObLSTxCtxMgr *ls_tx_ctx_mgr = NULL; \
|
|
ASSERT_EQ(n1->txs_.tx_ctx_mgr_.get_ls_tx_ctx_mgr(n1->ls_id_, ls_tx_ctx_mgr), OB_SUCCESS); \
|
|
ASSERT_EQ(ls_tx_ctx_mgr->switch_to_leader(), OB_SUCCESS); \
|
|
/* wait state to LEADER */ \
|
|
while (!ls_tx_ctx_mgr->is_master()) { \
|
|
if (REACH_TIME_INTERVAL(500_ms)) { \
|
|
TRANS_LOG(INFO, "wait LS TxCtxMgr to be leader", KPC(ls_tx_ctx_mgr)); \
|
|
} \
|
|
usleep(10_ms); \
|
|
} \
|
|
} while(0)
|