Files
oceanbase/unittest/storage/tx/it/test_tx_free_route.cpp

1095 lines
38 KiB
C++

/**
* Copyright (c) 2023 OceanBase
* OceanBase CE is licensed under Mulan PubL v2.
* You can use this software according to the terms and conditions of the Mulan PubL v2.
* You may obtain a copy of Mulan PubL v2 at:
* http://license.coscl.org.cn/MulanPubL-2.0
* THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
* EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
* MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
* See the Mulan PubL v2 for more details.
*/
#include <gtest/gtest.h>
#define private public
#define protected public
#define USING_LOG_PREFIX TRANS
#include "tx_node.h"
#include "test_tx_dsl.h"
namespace oceanbase
{
using namespace ::testing;
using namespace transaction;
using namespace share;
static ObSharedMemAllocMgr MTL_MEM_ALLOC_MGR;
namespace share {
int ObTenantTxDataAllocator::init(const char *label)
{
int ret = OB_SUCCESS;
ObMemAttr mem_attr;
throttle_tool_ = &(MTL_MEM_ALLOC_MGR.share_resource_throttle_tool());
if (OB_FAIL(slice_allocator_.init(
storage::TX_DATA_SLICE_SIZE, OB_MALLOC_NORMAL_BLOCK_SIZE, block_alloc_, mem_attr))) {
SHARE_LOG(WARN, "init slice allocator failed", KR(ret));
} else {
slice_allocator_.set_nway(ObTenantTxDataAllocator::ALLOC_TX_DATA_MAX_CONCURRENCY);
is_inited_ = true;
}
return ret;
}
int ObMemstoreAllocator::init()
{
throttle_tool_ = &MTL_MEM_ALLOC_MGR.share_resource_throttle_tool();
return arena_.init();
}
int ObMemstoreAllocator::AllocHandle::init()
{
int ret = OB_SUCCESS;
uint64_t tenant_id = 1;
ObSharedMemAllocMgr *mtl_alloc_mgr = &MTL_MEM_ALLOC_MGR;
ObMemstoreAllocator &host = mtl_alloc_mgr->memstore_allocator();
(void)host.init_handle(*this);
return ret;
}
}; // namespace share
namespace omt {
bool the_ctrl_of_enable_transaction_free_route = true;
ObTenantConfig *ObTenantConfigMgr::get_tenant_config_with_lock(const uint64_t tenant_id,
const uint64_t fallback_tenant_id /* = 0 */,
const uint64_t timeout_us /* = 0 */) const
{
static ObTenantConfig cfg;
cfg._enable_transaction_internal_routing = the_ctrl_of_enable_transaction_free_route;
cfg.writing_throttling_trigger_percentage = 100;
return &cfg;
}
}
namespace transaction {
/*
* tx free route:
*
* 1) Request ---(1)----> Node 1 ----> Response
* Request : Action + Tx-State-List
* (1):
* 1. session.sync_tx_state
* 2. session.post_tx_state_sync
* 3. handle Action
* 4. session.calc_tx_state
* 5. Response.add_sync_state
* 6. Response.add_result
*
* -----------------------------------------------
* Mock Proxy
* Proxy:
* 1. add_backends_(ObServerNode, ObServerNode)
* 2. handle(request, response)
* route_table.route(request.type, server)
* server.handle(request, response)
* {
* Route_Table : route(RequestType, ObServerNode)
* session_state: state_list_ { ObString, }
* }
* Mock ObServer
* 1. handle(request, response)
* sync_tx_state(request.sate_)
* 2. push_state(PushStateMsg, Resp)
* sync_tx_state(msg)
* 3. check_tx_alive(Msg)
*/
struct SyncTxState {
enum T { STATIC, DYNAMIC, PARTS, EXTRA } type_;
ObString data_;
const char* type_name_(T t) const {
static const char * const names[] = {
"STATIC", "DYNAMIC", "PARTS", "EXTRA"
};
return names[t];
}
TO_STRING_KV("type", type_name_(type_), K(data_.length()))
};
struct ObReq {
enum T { START_TX, COMMIT_TX, ROLLBACK_TX, READ, WRITE, DUMMY_WRITE, SAVEPOINT, ROLLBACK_SAVEPOINT, RELEASE_SAVEPOINT } type_;
union {
int64_t read_key_;
int64_t write_key_;
const char *savepoint_name_;
};
int stick_hash_ = 0;
bool is_serializable_isolation_ = false;
int64_t write_value_;
bool txn_free_route_support_;
ObSEArray<SyncTxState, 4> tx_state_list_;
void *hook_ctx_ = NULL;
const char* type_name_(T t) const {
static const char * const names[] = {
"START_TX", "COMMIT_TX", "ROLLBACK_TX", "READ", "WRITE", "DUMMY_WRITE", "SAVEPOINT", "ROLLBACK_SAVEPOINT", "RELEASE_SAVEPOINT" };
return names[t];
}
TO_STRING_KV("type", type_name_(type_), K_(read_key), K_(write_key), K_(txn_free_route_support), K_(tx_state_list));
public:
void set_stick_hash(int i) { stick_hash_ = i; }
static ObReq mk_start_tx() { ObReq req; req.type_ = START_TX; return req; }
static ObReq mk_commit_tx() { ObReq req; req.type_ = COMMIT_TX; return req; }
static ObReq mk_rollback_tx() { ObReq req; req.type_ = ROLLBACK_TX; return req; }
static ObReq mk_savepoint(const char *name) {
ObReq req;
req.type_ = SAVEPOINT;
req.savepoint_name_ = name;
return req;
}
static ObReq mk_rollback_savepoint(const char *name) {
ObReq req;
req.type_ = ROLLBACK_SAVEPOINT;
req.savepoint_name_ = name;
return req;
}
static ObReq mk_release_savepoint(const char *name) {
ObReq req;
req.type_ = RELEASE_SAVEPOINT;
req.savepoint_name_ = name;
return req;
}
static ObReq mk_read(const int64_t read_key) {
ObReq req;
req.type_ = READ;
req.is_serializable_isolation_ = false;
req.read_key_ = read_key;
return req;
}
static ObReq mk_serializable_read(const int64_t read_key) {
ObReq req;
req.type_ = READ;
req.is_serializable_isolation_ = true;
req.read_key_ = read_key;
return req;
}
static ObReq mk_write(const int64_t write_key, const int64_t write_value) {
ObReq req;
req.type_ = WRITE;
req.write_key_ = write_key;
req.write_value_ = write_value;
return req;
}
static ObReq mk_dummy_write(const int64_t write_key, const int64_t write_value) {
ObReq req;
req.type_ = DUMMY_WRITE;
req.write_key_ = write_key;
req.write_value_ = write_value;
return req;
}
};
struct ObResp {
int64_t read_value_;
ObTransID tx_id_;
bool in_txn_;
bool can_free_route_;
ObSEArray<SyncTxState, 4> tx_state_list_;
int ret_;
TO_STRING_KV(K_(ret), K_(tx_id), K_(in_txn), K_(can_free_route), K_(tx_state_list));
};
class MockObServer {
public:
MockObServer(const int64_t ls_id, const char*addr, const int32_t port, MsgBus &msg_bus)
: addr_(ObAddr(ObAddr::VER::IPV4, addr, port)),
tx_node_ptr_(new ObTxNode(ls_id, addr_, msg_bus)),
tx_node_(*tx_node_ptr_),
allocator_(), session_()
{
session_.test_init(1, 1111, 2222, &allocator_);
tx_node_.extra_msg_handler_ = [this](int type, void *msg) -> int {
return this->handle_msg_(type, msg);
};
}
~MockObServer() {
if (session_.get_tx_desc()) {
tx_node_.release_tx(*session_.get_tx_desc());
session_.get_tx_desc() = NULL;
}
delete tx_node_ptr_;
}
int start() { return tx_node_.start(); }
public:
int handle(ObReq&, ObResp&);
int receive_push_tx_state(ObTxFreeRoutePushState &state);
int receive_check_tx_alive(ObTxFreeRouteCheckAliveMsg &msg);
public:
enum HOOK { RECV_REQ, POST_SYNC_STATE, PRE_HANDLE, POST_HANDLE, POST_CALC_FREE_ROUTE, PRE_SEND_RESP, MAX_HOOK_NUM};
void setup_hook(HOOK hk, std::function<void(MockObServer &, ObReq&, ObResp&)> func)
{
hooks_[hk] = func;
}
void reset_hooks() {
for(int i =0; i<MAX_HOOK_NUM; i++) hooks_[i] = nullptr;
}
int check_tx_exist(ObTransID *tx_id) {
int ret = OB_SUCCESS;
ObTxDesc *it = NULL;
if (OB_FAIL(tx_node_.txs_.tx_desc_mgr_.get(*tx_id, it))) {
} else if (it) {
tx_node_.txs_.tx_desc_mgr_.revert(*it);
}
return ret;
}
int check_tx_sanity() {
int ret = OB_SUCCESS;
auto tx_id = session_.tx_desc_->tx_id_;
ObTxDesc *it = NULL;
if (OB_FAIL(tx_node_.txs_.tx_desc_mgr_.get(tx_id, it))) {
} else if (it != session_.tx_desc_) {
ret = OB_ERR_UNEXPECTED;
}
if (it) tx_node_.txs_.tx_desc_mgr_.revert(*it);
return ret;
}
private:
std::function<void(MockObServer&, ObReq&, ObResp&)> hooks_[MAX_HOOK_NUM];
private:
int do_handle_(ObReq &req, ObResp &resp);
int assign_resp_tx_state_(ObResp &resp, ObTxDesc *tx_desc, ObTxnFreeRouteCtx &ctx);
int handle_msg_(int type, void *msg);
int handle_msg_check_alive_resp__(ObTxFreeRouteCheckAliveRespMsg *msg);
ObAddr addr_;
ObTxNode *tx_node_ptr_;
ObTxNode &tx_node_;
ObMalloc allocator_;
sql::ObSQLSessionInfo session_;
TO_STRING_KV(K_(tx_node), K_(session));
};
class MockObProxy {
public:
MockObProxy(const char*addr, const int32_t port, MsgBus &msg_bus)
: addr_(ObAddr::VER::IPV4, addr, port), msg_bus_(msg_bus),
txn_free_route_support_(true),
can_free_route_(false),
tx_state_list_(),
in_txn_(false),
tx_start_backend_(NULL),
backends_()
{}
public:
struct TxStateInfo {
SyncTxState state_;
int version_;
TO_STRING_KV(K_(state), K_(version))
};
ObAddr addr_;
MsgBus &msg_bus_;
bool txn_free_route_support_;
bool can_free_route_;
TxStateInfo tx_state_list_[4];
struct BackendInfo {
BackendInfo(): server_(NULL), tx_state_version_() {}
MockObServer *server_;
int tx_state_version_[4];
TO_STRING_KV(K_(server),
"TxVer_0", tx_state_version_[0],
"TxVer_1", tx_state_version_[1],
"TxVer_2", tx_state_version_[2],
"TxVer_3", tx_state_version_[3]
);
};
public:
int add_backend(MockObServer &server);
int handle(ObReq&, ObResp&);
public:
enum HOOK { POST_ROUTE, PRE_HANDLE_RESP, POST_HANDLE_RESP, MAX_HOOK_NUM };
void setup_hook(HOOK hk, std::function<void(MockObProxy&, ObReq&, ObResp&, BackendInfo*)> func)
{
hooks_[hk] = func;
}
void reset_hooks() {
for(int i =0; i<MAX_HOOK_NUM; i++) hooks_[i] = nullptr;
}
private:
std::function<void(MockObProxy&, ObReq&, ObResp&, BackendInfo*)> hooks_[MAX_HOOK_NUM];
private:
int route_(ObReq &req, BackendInfo *&backend);
int do_handle_(ObReq &req, ObResp &resp);
int assign_req_tx_state_(ObReq &req, BackendInfo *backend);
int update_backend_tx_state_(ObResp &resp, BackendInfo *backend);
bool in_txn_;
BackendInfo *tx_start_backend_;
ObSEArray<BackendInfo, 4> backends_;
};
int MockObProxy::add_backend(MockObServer &server)
{
int ret = OB_SUCCESS;
BackendInfo b;
b.server_ = &server;
backends_.push_back(b);
return ret;
}
#define CALL_PROXY_HOOK(HK_NAME) \
do { \
if (hooks_[HK_NAME]) { hooks_[HK_NAME](*this, req, resp, backend); } \
} while(0)
int MockObProxy::handle(ObReq &req, ObResp &resp)
{
int ret = OB_SUCCESS;
BackendInfo *backend = NULL;
route_(req, backend);
CALL_PROXY_HOOK(POST_ROUTE);
assign_req_tx_state_(req, backend);
TRANS_LOG(INFO, "[MockObProxy] send req:", K(req));
backend->server_->handle(req, resp);
TRANS_LOG(INFO, "[MockObProxy] receive resp:", K(resp));
CALL_PROXY_HOOK(PRE_HANDLE_RESP);
if (resp.in_txn_ && !in_txn_) {
in_txn_ = true;
tx_start_backend_ = backend;
} else if (!resp.in_txn_ && in_txn_) {
in_txn_ = false;
tx_start_backend_ = NULL;
}
if (in_txn_) {
can_free_route_ = resp.can_free_route_;
}
update_backend_tx_state_(resp, backend);
CALL_PROXY_HOOK(POST_HANDLE_RESP);
return ret;
}
int MockObProxy::route_(ObReq &req, BackendInfo *&backend)
{
int ret = OB_SUCCESS;
backend = NULL;
bool free_route = !in_txn_ || can_free_route_;
if (free_route) {
switch(req.type_) {
case ObReq::T::READ:
backend = &backends_[req.read_key_ % backends_.count()];
break;
case ObReq::T::WRITE:
case ObReq::T::DUMMY_WRITE:
backend = &backends_[req.write_key_ % backends_.count()];
break;
default:
if (in_txn_) {
backend = tx_start_backend_;
} else if (req.stick_hash_){
backend = &backends_[req.stick_hash_ % backends_.count()];
} else {
backend = &backends_[random() % backends_.count()];
}
}
} else {
backend = tx_start_backend_;
}
if (!backend) {
TRANS_LOG(ERROR, "[MockProxy] tx backend is null");
ob_abort();
}
TRANS_LOG(INFO, "[MockObProxy] choose backend:", KPC(backend), K(req), K_(in_txn),
"TxVer_0", tx_state_list_[0].version_,
"TxVer_1", tx_state_list_[1].version_,
"TxVer_2", tx_state_list_[2].version_,
"TxVer_3", tx_state_list_[3].version_);
return ret;
}
int MockObProxy::assign_req_tx_state_(ObReq &req, BackendInfo *backend)
{
int ret = OB_SUCCESS;
req.tx_state_list_.reset();
for (int i =0; i < 4; i++) {
if (backend->tx_state_version_[i] < tx_state_list_[i].version_) {
req.tx_state_list_.push_back(tx_state_list_[i].state_);
}
}
req.txn_free_route_support_ = in_txn_ ? can_free_route_ : txn_free_route_support_;
return ret;
}
int MockObProxy::update_backend_tx_state_(ObResp &resp, BackendInfo *backend)
{
int ret = OB_SUCCESS;
for (int i =0; i < 4; i++) {
if (backend->tx_state_version_[i] > tx_state_list_[i].version_) {
TRANS_LOG(ERROR, "!!bug");
ob_abort();
} else {
backend->tx_state_version_[i] = tx_state_list_[i].version_;
}
}
for (int i =0; i < resp.tx_state_list_.count(); i++ ) {
auto &st = resp.tx_state_list_[i];
auto t = st.type_;
tx_state_list_[t].version_ +=1;
tx_state_list_[t].state_ = st;
backend->tx_state_version_[t] = tx_state_list_[t].version_;
}
return ret;
}
#define LOWER_STATIC static
#define LOWER_DYNAMIC dynamic
#define LOWER_PARTS parts
#define LOWER_EXTRA extra
#define UPPER_static STATIC
#define UPPER_dynamic DYNAMIC
#define UPPER_parts PARTS
#define UPPER_extra EXTRA
#define CALL_SERVER_HOOK(HK_NAME) \
do { \
if (hooks_[HK_NAME]) { hooks_[HK_NAME](*this, req, resp); } \
} while(0)
int MockObServer::handle(ObReq &req, ObResp &resp)
{
GCONF.self_addr_ = addr_;
int ret = OB_SUCCESS;
CALL_SERVER_HOOK(RECV_REQ);
auto &tx_desc = session_.get_tx_desc();
session_.set_txn_free_route(req.txn_free_route_support_);
auto &free_route_ctx = session_.get_txn_free_route_ctx();
for (int i =0; i < req.tx_state_list_.count(); i++ ) {
auto &s = req.tx_state_list_.at(i);
const char *buf = s.data_.ptr();
int64_t len = s.data_.length();
int64_t pos = 0;
switch(s.type_) {
#define TX_STATE_UPDATE__(T, tn) \
case SyncTxState::T: \
if (OB_SUCC(ret) && \
OB_FAIL(tx_node_.txn_free_route__update_##tn##_state(session_.get_sessid(), tx_desc, free_route_ctx, buf, len, pos, session_.get_data_version()))) { \
TRANS_LOG(ERROR, "update txn state fail", K(ret), "type", #T); \
} else if (pos != len) { \
TRANS_LOG(WARN, "[maybe] pos != len, consume buffer incomplete", K(ret), K(pos), K(len), "state_type", #T); \
} \
break;
#define TX_STATE_UPDATE_(T, tn) TX_STATE_UPDATE__(T, tn)
#define TX_STATE_UPDATE(T) TX_STATE_UPDATE_(T, CONCAT(LOWER_, T))
LST_DO(TX_STATE_UPDATE, (), STATIC, DYNAMIC, PARTS, EXTRA);
#undef TX_STATE_UPDATE_
#undef TX_STATE_UPDATE
default:
ob_abort();
}
}
CALL_SERVER_HOOK(POST_SYNC_STATE);
session_.post_sync_session_info();
CALL_SERVER_HOOK(PRE_HANDLE);
do_handle_(req, resp);
// ObTenantEnv::set_tenant(&tx_node_.tenant_);
// omt::ObTenantConfigGuard tenant_config(TENANT_CONF(MTL_ID()));
// bool tnt_enable = tenant_config->enable_transaction_free_route;
// TRANS_LOG(INFO, "[MockObServer] tenant_confi:", K(tenant_config.is_valid()), K(tnt_enable));
CALL_SERVER_HOOK(POST_HANDLE);
TRANS_LOG(INFO, "[MockObServer] before calc_txn_free_route", "my_addr", addr_,
"txn_free_route_ctx", session_.get_txn_free_route_ctx());
if (OB_FAIL(tx_node_.calc_txn_free_route(tx_desc, session_.get_txn_free_route_ctx()))) {
TRANS_LOG(ERROR, "calc_txn_free_route fail", K(ret));
ob_abort();
} else {
TRANS_LOG(INFO, "[MockObServer] after calc_txn_free_route", "my_addr", addr_,
"txn_free_route_ctx", session_.get_txn_free_route_ctx());
}
CALL_SERVER_HOOK(POST_CALC_FREE_ROUTE);
if (OB_FAIL(assign_resp_tx_state_(resp, tx_desc, session_.get_txn_free_route_ctx()))) {
TRANS_LOG(ERROR, "calc_txn_free_route fail", K(ret));
ob_abort();
}
CALL_SERVER_HOOK(PRE_SEND_RESP);
return ret;
}
int MockObServer::do_handle_(ObReq &req, ObResp &resp)
{
int ret = OB_SUCCESS;
// switch req.action:
// call tx_node's read/write/start_tx/commit_tx/rollback_tx ...
auto &tx_desc = session_.get_tx_desc();
if (!tx_desc) {
ret = tx_node_.acquire_tx(tx_desc, session_.get_sessid());
if (OB_FAIL(ret)) {
resp.ret_ = ret;
TRANS_LOG(WARN, "acquire tx failed", K(ret));
return ret;
}
}
switch(req.type_) {
case ObReq::T::START_TX: {
PREPARE_TX_PARAM(tx_param)
ret = tx_node_.start_tx(*tx_desc, tx_param);
if (OB_SUCC(ret)) { resp.tx_id_ = tx_desc->tx_id_; }
}
break;
case ObReq::T::COMMIT_TX:
ret = tx_node_.commit_tx(*tx_desc, 2 * 1000 * 1000L);
tx_node_.release_tx(*tx_desc);
tx_desc = NULL;
break;
case ObReq::T::ROLLBACK_TX:
ret = tx_node_.rollback_tx(*tx_desc);
tx_node_.release_tx(*tx_desc);
tx_desc = NULL;
break;
case ObReq::T::SAVEPOINT:
ret = tx_node_.create_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0, false);
break;
case ObReq::T::ROLLBACK_SAVEPOINT:
ret = tx_node_.rollback_to_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 1 * 1000 * 1000, 0);
break;
case ObReq::T::RELEASE_SAVEPOINT:
ret = tx_node_.release_explicit_savepoint(*tx_desc, ObString(req.savepoint_name_), 0);
break;
case ObReq::T::READ:
if (req.is_serializable_isolation_) {
ret = tx_node_.read(*tx_desc, req.read_key_, resp.read_value_, ObTxIsolationLevel::SERIAL);
} else {
ret = tx_node_.read(*tx_desc, req.read_key_, resp.read_value_);
}
break;
case ObReq::T::WRITE: {
PREPARE_TX_PARAM(tx_param);
ret = tx_node_.atomic_write(*tx_desc, req.write_key_, req.write_value_, 5 * 1000 * 1000L, tx_param);
}
break;
case ObReq::T::DUMMY_WRITE: {
PREPARE_TX_PARAM(tx_param);
ObTxSEQ sp;
ret = tx_node_.create_implicit_savepoint(*tx_desc, tx_param, sp, true);
}
break;
default:
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unexpected type", K(ret), K(req.type_));
ob_abort();
}
resp.ret_ = ret;
return ret;
}
int MockObServer::assign_resp_tx_state_(ObResp &resp, ObTxDesc *tx_desc, ObTxnFreeRouteCtx &ctx)
{
int ret = OB_SUCCESS;
resp.tx_state_list_.reset();
// for each changed state
// encode to tx_state obj
// push tx_state obj to resp.tx_state_list_
#define ENCODE_TX_STATE_(t) \
if (OB_SUCC(ret) && ctx.t##_changed_) { \
int64_t len = tx_node_.txn_free_route__get_##t##_state_serialize_size(tx_desc, ctx); \
char *buf = (char*)ob_malloc(len, ObMemAttr(OB_SERVER_TENANT_ID, ObNewModIds::TEST)); \
int64_t pos = 0; \
if (OB_FAIL(tx_node_.txn_free_route__serialize_##t##_state(session_.get_sessid(), tx_desc, ctx, buf, len, pos))) { \
TRANS_LOG(ERROR, "serialize fail", K(ret), K(tx_desc)); \
} else { \
SyncTxState state; \
state.data_ = ObString(len, buf); \
state.type_ = SyncTxState::T::UPPER_##t; \
ret = resp.tx_state_list_.push_back(state); \
} \
}
#define ENCODE_TX_STATE(t) ENCODE_TX_STATE_(t)
LST_DO(ENCODE_TX_STATE, (), static, dynamic, parts, extra);
#undef ENCODE_TX_STATE_
#undef ENCODE_TX_STATE
resp.in_txn_ = OB_NOT_NULL(tx_desc) && tx_desc->in_tx_for_free_route();
resp.can_free_route_ = ctx.can_free_route();
return ret;
}
int MockObServer::handle_msg_(int type, void *msg)
{
switch(type) {
case TX_MSG_TYPE::TX_FREE_ROUTE_CHECK_ALIVE_RESP:
return handle_msg_check_alive_resp__((ObTxFreeRouteCheckAliveRespMsg*)msg);
default:
ob_abort();
}
return OB_SUCCESS;
}
int MockObServer::handle_msg_check_alive_resp__(ObTxFreeRouteCheckAliveRespMsg *msg)
{
int ret = OB_SUCCESS;
if (OB_FAIL(msg->ret_)) {
if (ret == OB_TRANS_CTX_NOT_EXIST) {
TRANS_LOG(INFO, "[MockObserver] [tx free route] check txn alive : txn has quit", K(ret), KPC(session_.tx_desc_));
ret = OB_SUCCESS;
if (session_.tx_desc_) {
if (session_.tx_desc_->tx_id_ != msg->tx_id_) {
TRANS_LOG(ERROR, "bug", K(msg->tx_id_));
ob_abort();
}
tx_node_.release_tx(*session_.tx_desc_);
session_.tx_desc_ = NULL;
}
} else {
TRANS_LOG(INFO, "[MockObserver] [tx free route] check txn alive : fail", K(ret), KPC(session_.tx_desc_));
}
} else {
TRANS_LOG(INFO, "[MockObserver] [tx free route] check txn alive success", KPC(session_.tx_desc_), K_(session));
}
return ret;
}
} // transaction
class ObTestTxFreeRoute : public ::testing::Test
{
public:
virtual void SetUp() override
{
oceanbase::ObClusterVersion::get_instance().update_data_version(DATA_CURRENT_VERSION);
ObMallocAllocator::get_instance()->create_and_add_tenant_allocator(1001);
ObAddr ip_port(ObAddr::VER::IPV4, "119.119.0.1", 2023);
ObCurTraceId::init(ip_port);
GCONF._ob_trans_rpc_timeout = 500;
ObClockGenerator::init();
omt::the_ctrl_of_enable_transaction_free_route = true;
common::ObClusterVersion::get_instance().update_cluster_version(CLUSTER_VERSION_4_1_0_0);
const testing::TestInfo* const test_info =
testing::UnitTest::GetInstance()->current_test_info();
MTL_MEM_ALLOC_MGR.init();
struct rlimit rl;
getrlimit(RLIMIT_STACK, &rl);
setrlimit(20*1024*1024, &rl);
auto test_name = test_info->name();
_TRANS_LOG(INFO, ">>>> starting test : %s", test_name);
}
virtual void TearDown() override
{
const testing::TestInfo* const test_info =
testing::UnitTest::GetInstance()->current_test_info();
auto test_name = test_info->name();
_TRANS_LOG(INFO, ">>>> tearDown test : %s", test_name);
ObClockGenerator::destroy();
ObMallocAllocator::get_instance()->recycle_tenant_allocator(1001);
}
MsgBus bus_;
};
#define A_OK(k) ASSERT_EQ(OB_SUCCESS, k)
#define A_T(k) ASSERT_TRUE(k)
#define A_F(k) ASSERT_FALSE(k)
#define A_EQ(r, k) ASSERT_EQ(r, k)
#define A_NULL(r) ASSERT_EQ(r, (decltype(r))NULL)
#define A_NOT_NULL(r) ASSERT_NE(r, (decltype(r))NULL)
#define TXFR_CREATE_OBSERVER(addr, idx) \
MockObServer server##idx(idx ## 000, addr, 8888, bus_); \
A_OK(server##idx.start()); \
A_OK(proxy.add_backend(server##idx));
#define TXFR_TEST_SETUP(proxy_addr, server1, ...) \
MockObProxy proxy(proxy_addr, 4444, bus_); \
LST_DO2(TXFR_CREATE_OBSERVER, (), server1, ##__VA_ARGS__);
#define FIRST_1(a, ...) a
#define FIRST_0() 0
#define ARG_COUNT_(a, b, c, ...) c
#define ARG_COUNT(...) ARG_COUNT_(a, ##__VA_ARGS__, 1, 0)
#define EX_START_TX(...) \
{ \
ObResp resp; \
auto req = ObReq::mk_start_tx(); \
if (ARG_COUNT(__VA_ARGS__)) { \
req.set_stick_hash(CONCAT(FIRST_, ARG_COUNT(__VA_ARGS__))(__VA_ARGS__)); \
} \
A_OK(proxy.handle(req, resp)); \
A_T(resp.tx_id_.is_valid()); \
}
#define EX_READ(k, v) \
{ \
ObResp resp; \
auto req = ObReq::mk_read(k); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
A_EQ(resp.read_value_, v); \
}
#define EX_SERIALIZABLE_READ(k, v) \
{ \
ObResp resp; \
auto req = ObReq::mk_serializable_read(k); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
A_EQ(resp.read_value_, v); \
}
#define EX_WRITE(k, v) \
{ \
ObResp resp; \
auto req = ObReq::mk_write(k, v); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define EX_DUMMY_WRITE(k, v) \
{ \
ObResp resp; \
auto req = ObReq::mk_dummy_write(k, v); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define EX_SAVEPOINT(k, ...) \
{ \
char savepoint[10]; \
sprintf(savepoint, "%d", k); \
ObResp resp; \
auto req = ObReq::mk_savepoint(savepoint); \
if (ARG_COUNT(__VA_ARGS__)) { \
req.set_stick_hash(CONCAT(FIRST_, ARG_COUNT(__VA_ARGS__))(__VA_ARGS__)); \
} \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define EX_ROLLBACK_SAVEPOINT(k) \
{ \
char savepoint[10]; \
sprintf(savepoint, "%d", k); \
ObResp resp; \
auto req = ObReq::mk_rollback_savepoint(savepoint); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define EX_RELEASE_SAVEPOINT(k) \
{ \
char savepoint[10]; \
sprintf(savepoint, "%d", k); \
ObResp resp; \
auto req = ObReq::mk_release_savepoint(savepoint); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define EX_COMMIT_TX() \
{ \
ObResp resp; \
auto req = ObReq::mk_commit_tx(); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define EX_ROLLBACK_TX() \
{ \
ObResp resp; \
auto req = ObReq::mk_rollback_tx(); \
A_OK(proxy.handle(req, resp)); \
A_OK(resp.ret_); \
}
#define CHECK_ALIVE(server) \
do { \
auto &session = server.session_; \
A_OK(server.tx_node_.tx_free_route_check_alive(session.txn_free_route_ctx_, *session.tx_desc_, session.get_sessid())); \
} while(0)
#define WRAP_BLOCK(x) { x; }
#define EXPECT_PROXY(hk, ...) \
do { \
auto func = [&](MockObProxy &proxy, ObReq &req, ObResp &resp, MockObProxy::BackendInfo *backend) -> void { \
LST_DO(WRAP_BLOCK, (), __VA_ARGS__); \
}; \
proxy.setup_hook(MockObProxy::HOOK::hk, func); \
} while(0)
#define EXPECT_SERVER(svr, hk, ...) \
do { \
auto func = [&](MockObServer &server, ObReq &req, ObResp &resp) -> void { \
auto &txn_free_route_ctx = server.session_.txn_free_route_ctx_; \
LST_DO(WRAP_BLOCK, (), __VA_ARGS__); \
}; \
svr.setup_hook(MockObServer::HOOK::hk, func); \
} while(0)
#define RESET_HOOK_(svr) svr.reset_hooks()
#define RESET_HOOK(svr, ...) \
LST_DO(RESET_HOOK_, (;), svr, __VA_ARGS__);
#define RESET_HOOKS_2() RESET_HOOK(proxy, server1, server2)
TEST_F(ObTestTxFreeRoute, basic)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
TRANS_LOG(INFO, "+++++ explicit start txn");
for (int k = 0; k< 3; k++) {
EX_START_TX();
for(int i = 0 ; i< 10; i++) {
EX_WRITE(k * 100 + i, k * 110 + i);
EX_READ(k * 100 + i, k * 110 + i);
}
EX_COMMIT_TX();
}
TRANS_LOG(INFO, "+++++ implicit start txn");
for (int k = 3; k< 6; k++) {
for(int i = 0 ; i< 10; i++) {
EX_READ((k-1) * 100 + i, (k-1) * 110 + i);
EX_WRITE(k * 100 + i, k * 110 + i);
EX_READ(k * 100 + i, k * 110 + i);
}
EX_COMMIT_TX();
}
TRANS_LOG(INFO, "+++++ implicit start txn with SERIALIZABLE");
for (int k = 6; k< 9; k++) {
for(int i = 0 ; i< 10; i++) {
if (k % 2 == 1) {
EX_SERIALIZABLE_READ((k-1) * 100 + i, (k-1) * 110 + i);
} else {
EX_READ((k-1) * 100 + i, (k-1) * 110 + i);
}
EX_WRITE(k * 100 + i, k * 110 + i);
if ((k + i) % 2 == 0) {
EX_SERIALIZABLE_READ(k * 100 + i, k * 110 + i);
} else {
EX_READ(k * 100 + i, k * 110 + i);
}
}
EX_COMMIT_TX();
}
TRANS_LOG(INFO, "+++++ savepoint");
for (int k = 6; k< 9; k++) {
for(int i = 0 ; i< 10; i++) {
EX_SAVEPOINT(k * 100 + i);
if (i == 5) {
EX_ROLLBACK_SAVEPOINT(k * 100);
} else if (i == 7) {
EX_RELEASE_SAVEPOINT(k * 100 + 6);
}
}
EX_COMMIT_TX();
}
}
TEST_F(ObTestTxFreeRoute, savepoint)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
EX_SAVEPOINT(100);
EX_WRITE(101, 1);
EX_COMMIT_TX();
}
TEST_F(ObTestTxFreeRoute, serializalbe_read)
{
}
TEST_F(ObTestTxFreeRoute, implicit_start_tx)
{
}
TEST_F(ObTestTxFreeRoute, participants_update)
{
}
TEST_F(ObTestTxFreeRoute, sync_static_reuse_idle)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
// write 100 = 1
EX_START_TX(2);
EX_WRITE(100, 1);
EX_COMMIT_TX();
// plain read, acquire IDLE tx on server 1
EX_READ(100, 1);
A_NOT_NULL(server1.session_.tx_desc_);
A_F(server1.session_.tx_desc_->tx_id_.is_valid());
// start tx on server2
EX_START_TX(2);
EX_WRITE(101, 2);
// write on server1, static state will synced to it
EX_WRITE(102, 3);
A_EQ(server2.session_.tx_desc_->tx_id_, server1.session_.tx_desc_->tx_id_);
EX_COMMIT_TX();
}
namespace transaction {
#ifdef NDEBUG
int64_t MAX_STATE_SIZE = 4096L;
#else
extern int64_t MAX_STATE_SIZE;
#endif
}
TEST_F(ObTestTxFreeRoute, txn_too_large_fallback)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
EX_START_TX(1);
oceanbase::transaction::MAX_STATE_SIZE = 1L;
EX_WRITE(100, 1);
EX_WRITE(101, 2);
EX_WRITE(102, 3);
EX_COMMIT_TX();
oceanbase::transaction::MAX_STATE_SIZE = 4096L;
}
TEST_F(ObTestTxFreeRoute, keep_alive)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
EX_START_TX(1); // txn on server 2
EX_WRITE(100, 1); // write 100 on server 1
EX_WRITE(101, 2); // write 101 on server 2
EX_COMMIT_TX(); // commit on server 2
// NOW txn has released on server 2, send check from server 1
A_NOT_NULL(server1.session_.tx_desc_);
CHECK_ALIVE(server1);
while(true) {
if (OB_ISNULL(server1.session_.tx_desc_)) {
break;
}
usleep(1000);
}
A_NULL(server1.session_.tx_desc_);
}
TEST_F(ObTestTxFreeRoute, upgrade_to_4_1)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
common::ObClusterVersion::get_instance().update_cluster_version(CLUSTER_VERSION_4_0_0_0);
EX_START_TX(1);
A_T(proxy.in_txn_);
A_F(proxy.can_free_route_);
EX_WRITE(100,1);
EX_WRITE(101,2);
EX_COMMIT_TX();
}
// TEST_F(ObTestTxFreeRoute, twiddle_knob_on_the_fly)
// {
// TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
// // previous is on
// EX_START_TX(1);
// EX_WRITE(102,2);
// EX_WRITE(103,2);
// EX_COMMIT_TX();
// omt::the_ctrl_of_enable_transaction_free_route = false;
// // off -> on
// EX_START_TX(1);
// A_T(proxy.in_txn_);
// A_F(proxy.can_free_route_);
// EX_WRITE(100, 1); // on server 2
// omt::the_ctrl_of_enable_transaction_free_route = true;
// A_T(proxy.in_txn_);
// A_F(proxy.can_free_route_);
// EX_WRITE(101, 1); // on server 2
// EX_COMMIT_TX();
// // on -> off
// EX_START_TX(1);
// A_T(proxy.in_txn_);
// A_T(proxy.can_free_route_);
// EX_WRITE(100, 1); // on server 1
// omt::the_ctrl_of_enable_transaction_free_route = false;
// A_T(proxy.in_txn_);
// A_T(proxy.can_free_route_);
// EX_WRITE(101, 1); // on server 2
// EX_COMMIT_TX();
// }
TEST_F(ObTestTxFreeRoute, sample)
{
TXFR_TEST_SETUP("127.0.0.1", "127.0.0.2", "127.0.0.3");
EXPECT_PROXY(POST_ROUTE,
A_EQ(backend->server_, &server2),
A_EQ(backend->tx_state_version_[0], 0));
EXPECT_SERVER(server2, RECV_REQ,
A_T(req.txn_free_route_support_));
EXPECT_SERVER(server2, PRE_HANDLE,
A_F(txn_free_route_ctx.in_txn_before_handle_request_),
A_F(txn_free_route_ctx.can_free_route_),
A_F(txn_free_route_ctx.static_changed_),
A_F(txn_free_route_ctx.dynamic_changed_),
A_F(txn_free_route_ctx.parts_changed_),
A_F(txn_free_route_ctx.extra_changed_));
EXPECT_SERVER(server2, POST_CALC_FREE_ROUTE,
A_F(txn_free_route_ctx.in_txn_before_handle_request_),
A_T(txn_free_route_ctx.can_free_route_),
A_T(txn_free_route_ctx.static_changed_),
A_T(txn_free_route_ctx.dynamic_changed_),
A_T(txn_free_route_ctx.parts_changed_),
A_T(txn_free_route_ctx.extra_changed_),
A_F(txn_free_route_ctx.flag_.is_tx_terminated()),
A_F(txn_free_route_ctx.flag_.is_fallback()));
EX_START_TX(1);
RESET_HOOKS_2();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server2));
EXPECT_SERVER(server2, RECV_REQ,
A_T(req.tx_state_list_.empty()));
EXPECT_SERVER(server2, PRE_HANDLE,
A_T(txn_free_route_ctx.in_txn_before_handle_request_),
A_NOT_NULL(server.session_.tx_desc_));
EX_WRITE(101, 1);
RESET_HOOKS_2();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server1));
EXPECT_SERVER(server1, RECV_REQ,
A_EQ(req.tx_state_list_.count(), 4));
EXPECT_SERVER(server1, PRE_HANDLE,
A_T(txn_free_route_ctx.in_txn_before_handle_request_),
A_NOT_NULL(server.session_.tx_desc_));
EX_WRITE(102, 1);
RESET_HOOKS_2();
auto prev_tx_id = server1.session_.tx_desc_->tx_id_;
EX_COMMIT_TX();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server1));
EXPECT_SERVER(server1, RECV_REQ,
A_EQ(req.tx_state_list_.count(), 4),
// previouse txn's desc is exist
A_NOT_NULL(server.session_.tx_desc_),
A_EQ(server.session_.tx_desc_->tx_id_, prev_tx_id));
EXPECT_SERVER(server1, PRE_HANDLE,
A_F(txn_free_route_ctx.in_txn_before_handle_request_),
// txn should be released by synced terminated txn state
A_NULL(server.session_.tx_desc_));
EXPECT_SERVER(server1, POST_CALC_FREE_ROUTE,
A_F(txn_free_route_ctx.in_txn_before_handle_request_),
A_NOT_NULL(server.session_.tx_desc_));
EX_START_TX(2);
RESET_HOOKS_2();
EX_COMMIT_TX();
// replace extra: session has idle tx (with txid)
// eg:
// step 1: NODE1: DELETE FROM T WHERE 1 = 0; ==> create savepoint (will alloc txid)
// (because it has no modifies, txn state will in IDLE afte stmt executed)
// step 2: NODE2: SAVEPOINT s0 ==> will return EXTRA INFO, in trans is true (to proxy)
// step 3: NODE1: INSERT INTO T ==> will sync EXTRA INFO down, will replace session's tx on step 1
// step1
RESET_HOOKS_2();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server2));
EXPECT_SERVER(server2, POST_CALC_FREE_ROUTE,
A_F(txn_free_route_ctx.in_txn_before_handle_request_),
A_F(txn_free_route_ctx.can_free_route_),
A_F(txn_free_route_ctx.static_changed_),
A_F(txn_free_route_ctx.dynamic_changed_),
A_F(txn_free_route_ctx.parts_changed_),
A_F(txn_free_route_ctx.extra_changed_),
A_F(txn_free_route_ctx.flag_.is_tx_terminated()));
EX_DUMMY_WRITE(201,100);
// step2
RESET_HOOKS_2();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server1));
EXPECT_SERVER(server1, POST_CALC_FREE_ROUTE,
A_F(txn_free_route_ctx.in_txn_before_handle_request_),
A_T(txn_free_route_ctx.can_free_route_),
A_F(txn_free_route_ctx.static_changed_),
A_F(txn_free_route_ctx.dynamic_changed_),
A_F(txn_free_route_ctx.parts_changed_),
A_T(txn_free_route_ctx.extra_changed_),
A_F(txn_free_route_ctx.flag_.is_tx_terminated()));
EX_SAVEPOINT(202, 102);
// step3
RESET_HOOKS_2();
EXPECT_PROXY(POST_ROUTE, A_EQ(backend->server_, &server2));
EXPECT_SERVER(server2, RECV_REQ,
A_T(server.session_.tx_desc_->tx_id_.is_valid()),
A_OK(server.check_tx_sanity()),
req.hook_ctx_ = (void*)new ObTransID(server.session_.tx_desc_->tx_id_));
EXPECT_SERVER(server2, PRE_HANDLE,
A_T(txn_free_route_ctx.in_txn_before_handle_request_),
A_EQ(OB_ENTRY_NOT_EXIST, server.check_tx_exist((ObTransID*)req.hook_ctx_)),
A_EQ(OB_SUCCESS, server.check_tx_sanity()));
EX_WRITE(201, 1001);
RESET_HOOKS_2();
EX_COMMIT_TX();
}
} // oceanbase
int main(int argc, char **argv)
{
system("rm -rf test_tx_free_route.log*");
ObLogger &logger = ObLogger::get_logger();
logger.set_file_name("test_tx_free_route.log", true, false,
"test_tx_free_route.log", // rs
"test_tx_free_route.log", // election
"test_tx_free_route.log"); // audit
logger.set_log_level(OB_LOG_LEVEL_DEBUG);
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}